diff --git a/django_graphql_playground/apps/pubsub/services/consumer.py b/django_graphql_playground/apps/pubsub/services/consumer.py index d52df26..69877a3 100644 --- a/django_graphql_playground/apps/pubsub/services/consumer.py +++ b/django_graphql_playground/apps/pubsub/services/consumer.py @@ -1,5 +1,6 @@ import json import logging +import ssl import time import uuid from dataclasses import dataclass @@ -83,6 +84,13 @@ def start(self, block=True, testing=False): self._connection.connect(**self._connection_configuration) self._connection.subscribe(id=self._subscription_id, **self._subscription_configuration) + if block: + while True: + if not self.is_open(): + logger.info("It is not open. Starting...") + self.start(block=False) + time.sleep(1) + if block: while True: # https://stackoverflow.com/a/529052/3899136 @@ -93,12 +101,19 @@ def close(self): def build_listener(destination_name, callback, ack_type=Acknowledgements.CLIENT, **connection_params) -> _Listener: - logger.debug("Building listener for %s...", destination_name) + logger.info("Building listener...") hosts = [(connection_params.get("host"), connection_params.get("port"))] - # http://stomp.github.io/stomp-specification-1.2.html#Heart-beating - conn = stomp.Connection(hosts, heartbeats=(10000, 10000)) + use_ssl = connection_params.get("use_ssl", False) + ssl_version = connection_params.get("ssl_version", ssl.PROTOCOL_TLS) + logger.info(f"Use SSL? {use_ssl}. Version: {ssl_version}") + conn = stomp.Connection(hosts, ssl_version=ssl_version, use_ssl=use_ssl) client_id = connection_params.get("client_id", uuid.uuid4()) subscription_configuration = {"destination": destination_name, "ack": ack_type.value} - connection_configuration = {"wait": True, "headers": {"client-id": f"{client_id}-listener"}} + connection_configuration = { + "username": connection_params.get("username"), + "passcode": connection_params.get("password"), + "wait": True, + "headers": {"client-id": f"{client_id}-listener"}, + } listener = _Listener(conn, callback, subscription_configuration, connection_configuration) return listener diff --git a/django_graphql_playground/apps/pubsub/services/producer.py b/django_graphql_playground/apps/pubsub/services/producer.py index 2212b8c..166f02f 100644 --- a/django_graphql_playground/apps/pubsub/services/producer.py +++ b/django_graphql_playground/apps/pubsub/services/producer.py @@ -1,5 +1,6 @@ import json import logging +import ssl import uuid from contextlib import contextmanager from typing import Dict @@ -16,6 +17,7 @@ def __init__(self, connection: StompConnection11, connection_configuration: Dict self._connection_configuration = connection_configuration self._connection = connection self._destination_name = destination_name + # TODO: Make Content-Type dynamic self._default_content_type = "application/json;charset=utf-8" def is_open(self): @@ -24,9 +26,11 @@ def is_open(self): def start(self): self._connection.start() self._connection.connect(**self._connection_configuration) + logger.info("Connected") def close(self): self._connection.disconnect() + logger.info("Disconnected") def send(self, body, headers=None): if hasattr(self, "_tmp_transaction_id"): @@ -38,6 +42,9 @@ def send(self, body, headers=None): transaction=self._tmp_transaction_id, ) else: + if not self.is_open(): + logger.info("It is not open. Starting...") + self.start() self._connection.send( self._destination_name, body=json.dumps(body, cls=DjangoJSONEncoder), @@ -47,11 +54,19 @@ def send(self, body, headers=None): def build_publisher(destination_name, **connection_params) -> _Publisher: - logger.debug("Building publisher for %s...", destination_name) + logger.info("Building publisher...") hosts = [(connection_params.get("host"), connection_params.get("port"))] + use_ssl = connection_params.get("use_ssl", False) + ssl_version = connection_params.get("ssl_version", ssl.PROTOCOL_TLS) + logger.info(f"Use SSL? {use_ssl}. Version: {ssl_version}") client_id = connection_params.get("client_id", uuid.uuid4()) - connection_configuration = {"wait": True, "headers": {"client-id": f"{client_id}-publisher"}} - conn = stomp.Connection(hosts) + connection_configuration = { + "username": connection_params.get("username"), + "passcode": connection_params.get("password"), + "wait": True, + "headers": {"client-id": f"{client_id}-publisher"}, + } + conn = stomp.Connection(hosts, ssl_version=ssl_version, use_ssl=use_ssl) publisher = _Publisher(conn, connection_configuration, destination_name) return publisher @@ -62,7 +77,8 @@ def auto_open_close_connection(publisher: _Publisher): publisher.start() yield finally: - publisher.close() + if publisher.is_open(): + publisher.close() @contextmanager @@ -73,9 +89,9 @@ def do_inside_transaction(publisher: _Publisher): setattr(publisher, "_tmp_transaction_id", transaction_id) yield publisher._connection.commit(getattr(publisher, "_tmp_transaction_id")) - except: - logger.exception("Could not conclude transaction properly") - publisher._connection.abort(getattr(publisher, "_tmp_transaction_id")) finally: - if hasattr(publisher, "_tmp_transaction_id"): - delattr(publisher, "_tmp_transaction_id") + try: + publisher._connection.abort(getattr(publisher, "_tmp_transaction_id")) + finally: + if hasattr(publisher, "_tmp_transaction_id"): + delattr(publisher, "_tmp_transaction_id")