Skip to content

Commit

Permalink
Updated STOMP consumer and producer services
Browse files Browse the repository at this point in the history
  • Loading branch information
willianantunes committed May 26, 2019
1 parent 82a3990 commit 4bd160d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
23 changes: 19 additions & 4 deletions django_graphql_playground/apps/pubsub/services/consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import ssl
import time
import uuid
from dataclasses import dataclass
Expand Down Expand Up @@ -83,6 +84,13 @@ def start(self, block=True, testing=False):
self._connection.connect(**self._connection_configuration)
self._connection.subscribe(id=self._subscription_id, **self._subscription_configuration)

if block:
while True:
if not self.is_open():
logger.info("It is not open. Starting...")
self.start(block=False)
time.sleep(1)

if block:
while True:
# https://stackoverflow.com/a/529052/3899136
Expand All @@ -93,12 +101,19 @@ def close(self):


def build_listener(destination_name, callback, ack_type=Acknowledgements.CLIENT, **connection_params) -> _Listener:
logger.debug("Building listener for %s...", destination_name)
logger.info("Building listener...")
hosts = [(connection_params.get("host"), connection_params.get("port"))]
# http://stomp.github.io/stomp-specification-1.2.html#Heart-beating
conn = stomp.Connection(hosts, heartbeats=(10000, 10000))
use_ssl = connection_params.get("use_ssl", False)
ssl_version = connection_params.get("ssl_version", ssl.PROTOCOL_TLS)
logger.info(f"Use SSL? {use_ssl}. Version: {ssl_version}")
conn = stomp.Connection(hosts, ssl_version=ssl_version, use_ssl=use_ssl)
client_id = connection_params.get("client_id", uuid.uuid4())
subscription_configuration = {"destination": destination_name, "ack": ack_type.value}
connection_configuration = {"wait": True, "headers": {"client-id": f"{client_id}-listener"}}
connection_configuration = {
"username": connection_params.get("username"),
"passcode": connection_params.get("password"),
"wait": True,
"headers": {"client-id": f"{client_id}-listener"},
}
listener = _Listener(conn, callback, subscription_configuration, connection_configuration)
return listener
34 changes: 25 additions & 9 deletions django_graphql_playground/apps/pubsub/services/producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import ssl
import uuid
from contextlib import contextmanager
from typing import Dict
Expand All @@ -16,6 +17,7 @@ def __init__(self, connection: StompConnection11, connection_configuration: Dict
self._connection_configuration = connection_configuration
self._connection = connection
self._destination_name = destination_name
# TODO: Make Content-Type dynamic
self._default_content_type = "application/json;charset=utf-8"

def is_open(self):
Expand All @@ -24,9 +26,11 @@ def is_open(self):
def start(self):
self._connection.start()
self._connection.connect(**self._connection_configuration)
logger.info("Connected")

def close(self):
self._connection.disconnect()
logger.info("Disconnected")

def send(self, body, headers=None):
if hasattr(self, "_tmp_transaction_id"):
Expand All @@ -38,6 +42,9 @@ def send(self, body, headers=None):
transaction=self._tmp_transaction_id,
)
else:
if not self.is_open():
logger.info("It is not open. Starting...")
self.start()
self._connection.send(
self._destination_name,
body=json.dumps(body, cls=DjangoJSONEncoder),
Expand All @@ -47,11 +54,19 @@ def send(self, body, headers=None):


def build_publisher(destination_name, **connection_params) -> _Publisher:
logger.debug("Building publisher for %s...", destination_name)
logger.info("Building publisher...")
hosts = [(connection_params.get("host"), connection_params.get("port"))]
use_ssl = connection_params.get("use_ssl", False)
ssl_version = connection_params.get("ssl_version", ssl.PROTOCOL_TLS)
logger.info(f"Use SSL? {use_ssl}. Version: {ssl_version}")
client_id = connection_params.get("client_id", uuid.uuid4())
connection_configuration = {"wait": True, "headers": {"client-id": f"{client_id}-publisher"}}
conn = stomp.Connection(hosts)
connection_configuration = {
"username": connection_params.get("username"),
"passcode": connection_params.get("password"),
"wait": True,
"headers": {"client-id": f"{client_id}-publisher"},
}
conn = stomp.Connection(hosts, ssl_version=ssl_version, use_ssl=use_ssl)
publisher = _Publisher(conn, connection_configuration, destination_name)
return publisher

Expand All @@ -62,7 +77,8 @@ def auto_open_close_connection(publisher: _Publisher):
publisher.start()
yield
finally:
publisher.close()
if publisher.is_open():
publisher.close()


@contextmanager
Expand All @@ -73,9 +89,9 @@ def do_inside_transaction(publisher: _Publisher):
setattr(publisher, "_tmp_transaction_id", transaction_id)
yield
publisher._connection.commit(getattr(publisher, "_tmp_transaction_id"))
except:
logger.exception("Could not conclude transaction properly")
publisher._connection.abort(getattr(publisher, "_tmp_transaction_id"))
finally:
if hasattr(publisher, "_tmp_transaction_id"):
delattr(publisher, "_tmp_transaction_id")
try:
publisher._connection.abort(getattr(publisher, "_tmp_transaction_id"))
finally:
if hasattr(publisher, "_tmp_transaction_id"):
delattr(publisher, "_tmp_transaction_id")

0 comments on commit 4bd160d

Please sign in to comment.