diff --git a/nsq/client.py b/nsq/client.py index 47bd405..3ea9df3 100644 --- a/nsq/client.py +++ b/nsq/client.py @@ -3,15 +3,16 @@ import time import logging -import tornado.ioloop +from tornado.ioloop import IOLoop, PeriodicCallback logger = logging.getLogger(__name__) class Client(object): def __init__(self, **kwargs): - self.io_loop = tornado.ioloop.IOLoop.current() - tornado.ioloop.PeriodicCallback(self._check_last_recv_timestamps, 60 * 1000).start() + self.io_loop = IOLoop.current() + self.periodic_check = PeriodicCallback(self._check_last_recv_timestamps, 60 * 1000) + self.periodic_check.start() def _on_connection_identify(self, conn, data, **kwargs): logger.info('[%s:%s] IDENTIFY sent %r' % (conn.id, self.name, data)) @@ -75,3 +76,6 @@ def heartbeat(self, conn): :param conn: the :class:`nsq.AsyncConn` over which the heartbeat was received """ pass + + def close(self): + self.periodic_check.stop() diff --git a/nsq/reader.py b/nsq/reader.py index 0c24245..02bf193 100644 --- a/nsq/reader.py +++ b/nsq/reader.py @@ -221,6 +221,7 @@ def __init__( self.backoff_block = False self.backoff_block_completed = True + self._closed = False self.conns = {} self.connection_attempts = {} self.http_client = tornado.httpclient.AsyncHTTPClient() @@ -264,8 +265,10 @@ def _run(self): def close(self): """ - Closes all connections stops all periodic callbacks + Closes all connections and stops all periodic callbacks """ + self._closed = True + for conn in self.conns.values(): conn.close() @@ -273,6 +276,8 @@ def close(self): if self.query_periodic is not None: self.query_periodic.stop() + super(Reader, self).close() + def set_message_handler(self, message_handler): """ Assigns the callback method to be executed for each message received @@ -494,8 +499,8 @@ def connect_to_nsqd(self, host, port): # only attempt to re-connect once every 10s per destination # this throttles reconnects to failed endpoints now = time.time() - last_connect_attempt = self.connection_attempts.get(conn.id) - if last_connect_attempt and last_connect_attempt > now - 10: + last_connect_attempt = self.connection_attempts.get(conn.id, 0) + if last_connect_attempt > now - 10: return self.connection_attempts[conn.id] = now @@ -559,6 +564,9 @@ def _on_connection_close(self, conn, **kwargs): self.io_loop.remove_timeout(conn.rdy_timeout) conn.rdy_timeout = None + if self._closed: + return + if not self.lookupd_http_addresses: # automatically reconnect to nsqd addresses when not using lookupd logger.info('[%s:%s] attempting to reconnect in 15s', conn.id, self.name) diff --git a/nsq/writer.py b/nsq/writer.py index bfecbb7..f3a8ad7 100644 --- a/nsq/writer.py +++ b/nsq/writer.py @@ -104,6 +104,7 @@ def __init__(self, nsqd_tcp_addresses, reconnect_interval=15.0, name=None, **kwa self.name = name or nsqd_tcp_addresses[0] self.nsqd_tcp_addresses = nsqd_tcp_addresses + self._closed = False self.conns = {} # Verify keyword arguments @@ -245,6 +246,9 @@ def _on_connection_close(self, conn, **kwargs): logger.exception('[%s] uncaught exception in callback', conn.id) logger.warning('[%s] connection closed', conn.id) + if self._closed: + return + logger.info('[%s] attempting to reconnect in %0.2fs', conn.id, self.reconnect_interval) reconnect_callback = functools.partial(self.connect_to_nsqd, host=conn.host, port=conn.port) @@ -254,3 +258,14 @@ def _finish_pub(self, conn, data, command, topic, msg): if isinstance(data, protocol.Error): logger.error('[%s] failed to %s (%s, %s), data is %s', conn.id if conn else 'NA', command, topic, msg, data) + + def close(self): + """ + Closes all connections and stops all periodic callbacks + """ + self._closed = True + + for conn in self.conns.values(): + conn.close() + + super(Writer, self).close()