Skip to content

Commit

Permalink
Made exchange durable. Always retry after disconnect.
Browse files Browse the repository at this point in the history
Fixes #100
  • Loading branch information
xarg committed Jul 26, 2017
1 parent 97f4faa commit ce48609
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions socketio/kombu_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _connection(self):
return kombu.Connection(self.url)

def _exchange(self):
return kombu.Exchange(self.channel, type='fanout', durable=False)
return kombu.Exchange(self.channel, type='fanout')

def _queue(self):
queue_name = 'flask-socketio.' + str(uuid.uuid4())
Expand All @@ -76,13 +76,24 @@ def _queue(self):
def _producer(self):
return self._connection().Producer(exchange=self._exchange())

def __error_callback(self, exception, interval):
self.server.logger.exception('Sleeping {}s'.format(interval))

def _publish(self, data):
self.producer.publish(pickle.dumps(data))
connection = self._connection()
publish = connection.ensure(self.producer, self.producer.publish, errback=self.__error_callback)
publish(pickle.dumps(data))

def _listen(self):
reader_queue = self._queue()
with self._connection().SimpleQueue(reader_queue) as queue:
while True:
message = queue.get(block=True)
message.ack()
yield message.payload

while True:
connection = self._connection().ensure_connection(errback=self.__error_callback)
try:
with connection.SimpleQueue(reader_queue) as queue:
while True:
message = queue.get(block=True)
message.ack()
yield message.payload
except connection.connection_errors:
self.server.logger.exception("Connection error while reading from queue")

0 comments on commit ce48609

Please sign in to comment.