Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Writer.close() and Client.close(), improve Reader.close() #245

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions nsq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import time
import logging

import tornado.ioloop
from tornado.ioloop import IOLoop, PeriodicCallback

logger = logging.getLogger(__name__)


class Client(object):
def __init__(self, **kwargs):
self.io_loop = tornado.ioloop.IOLoop.current()
tornado.ioloop.PeriodicCallback(self._check_last_recv_timestamps, 60 * 1000).start()
self.io_loop = IOLoop.current()
self.periodic_check = PeriodicCallback(self._check_last_recv_timestamps, 60 * 1000)
self.periodic_check.start()

def _on_connection_identify(self, conn, data, **kwargs):
logger.info('[%s:%s] IDENTIFY sent %r' % (conn.id, self.name, data))
Expand Down Expand Up @@ -75,3 +76,6 @@ def heartbeat(self, conn):
:param conn: the :class:`nsq.AsyncConn` over which the heartbeat was received
"""
pass

def close(self):
self.periodic_check.stop()
14 changes: 11 additions & 3 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def __init__(
self.backoff_block = False
self.backoff_block_completed = True

self._closed = False
self.conns = {}
self.connection_attempts = {}
self.http_client = tornado.httpclient.AsyncHTTPClient()
Expand Down Expand Up @@ -264,15 +265,19 @@ def _run(self):

def close(self):
"""
Closes all connections stops all periodic callbacks
Closes all connections and stops all periodic callbacks
"""
self._closed = True

for conn in self.conns.values():
conn.close()

self.redist_periodic.stop()
if self.query_periodic is not None:
self.query_periodic.stop()

super(Reader, self).close()

def set_message_handler(self, message_handler):
"""
Assigns the callback method to be executed for each message received
Expand Down Expand Up @@ -494,8 +499,8 @@ def connect_to_nsqd(self, host, port):
# only attempt to re-connect once every 10s per destination
# this throttles reconnects to failed endpoints
now = time.time()
last_connect_attempt = self.connection_attempts.get(conn.id)
if last_connect_attempt and last_connect_attempt > now - 10:
last_connect_attempt = self.connection_attempts.get(conn.id, 0)
if last_connect_attempt > now - 10:
return
self.connection_attempts[conn.id] = now

Expand Down Expand Up @@ -559,6 +564,9 @@ def _on_connection_close(self, conn, **kwargs):
self.io_loop.remove_timeout(conn.rdy_timeout)
conn.rdy_timeout = None

if self._closed:
return

if not self.lookupd_http_addresses:
# automatically reconnect to nsqd addresses when not using lookupd
logger.info('[%s:%s] attempting to reconnect in 15s', conn.id, self.name)
Expand Down
15 changes: 15 additions & 0 deletions nsq/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self, nsqd_tcp_addresses, reconnect_interval=15.0, name=None, **kwa

self.name = name or nsqd_tcp_addresses[0]
self.nsqd_tcp_addresses = nsqd_tcp_addresses
self._closed = False
self.conns = {}

# Verify keyword arguments
Expand Down Expand Up @@ -245,6 +246,9 @@ def _on_connection_close(self, conn, **kwargs):
logger.exception('[%s] uncaught exception in callback', conn.id)

logger.warning('[%s] connection closed', conn.id)
if self._closed:
return

logger.info('[%s] attempting to reconnect in %0.2fs', conn.id, self.reconnect_interval)
reconnect_callback = functools.partial(self.connect_to_nsqd,
host=conn.host, port=conn.port)
Expand All @@ -254,3 +258,14 @@ def _finish_pub(self, conn, data, command, topic, msg):
if isinstance(data, protocol.Error):
logger.error('[%s] failed to %s (%s, %s), data is %s',
conn.id if conn else 'NA', command, topic, msg, data)

def close(self):
"""
Closes all connections and stops all periodic callbacks
"""
self._closed = True

for conn in self.conns.values():
conn.close()

super(Writer, self).close()