Skip to content

Commit

Permalink
Entire new App to do a playground with STOMP through ActiveMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
willianantunes committed May 19, 2019
1 parent 113981c commit f6272b3
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 2 deletions.
Empty file.
5 changes: 5 additions & 0 deletions django_graphql_playground/apps/pubsub/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from django.apps import AppConfig


class PubsubConfig(AppConfig):
name = "django_graphql_playground.apps.pubsub"
6 changes: 6 additions & 0 deletions django_graphql_playground/apps/pubsub/exceps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class CorrelationIdMustBeSetException(Exception):
pass


class FormatNotValidaException(Exception):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging

from django.core.management.base import BaseCommand

from django_graphql_playground.apps.core.models import Category
from django_graphql_playground.apps.core.models import Ingredient
from django_graphql_playground.apps.pubsub.apps import PubsubConfig
from django_graphql_playground.apps.pubsub.exceps import CorrelationIdMustBeSetException
from django_graphql_playground.apps.pubsub.exceps import FormatNotValidaException
from django_graphql_playground.apps.pubsub.services import consumer
from django_graphql_playground.apps.pubsub.services.consumer import Payload
from django_graphql_playground.settings import MY_DESTINATION
from django_graphql_playground.settings import STOMP_SERVER_HOST
from django_graphql_playground.settings import STOMP_SERVER_PORT
from django_graphql_playground.support.django_helpers import make_sure_database_is_usable
from django_graphql_playground.support.log import do_log_with_correlation_id

logger = logging.getLogger(__name__)


def _listener_callback(payload: Payload):
make_sure_database_is_usable()

correlation_id = payload.headers.get("correlation-id")

if not correlation_id:
raise CorrelationIdMustBeSetException

logger.info("A message arrived! Initializing logic...")

with do_log_with_correlation_id(correlation_id):
try:
body = payload.body

if body.get("categories"):
logger.info("Persisting categories...")
for category in body["categories"]:
Category.objects.create(**category)
elif body.get("ingredients"):
logger.info("Persisting ingredients...")
for ingredient in body["ingredients"]:
Ingredient.objects.create(**ingredient)
else:
raise FormatNotValidaException

payload.ack()
except:
logger.exception(f"The following payload could not be consumed: {payload}")
payload.nack()


connection_params = {"host": STOMP_SERVER_HOST, "port": STOMP_SERVER_PORT, "client_id": PubsubConfig.name}
listener = consumer.build_listener(MY_DESTINATION, _listener_callback, **connection_params)


class Command(BaseCommand):
help = "Start App consumer"

def handle(self, *args, **options):
try:
logger.info(f"Starting listener...")
listener.start()
except BaseException as e:
logger.exception(f"A exception of type {type(e)} was captured during listener logic")
finally:
logger.info(f"Trying to close listener...")
listener.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging
import uuid

from django.core.management.base import BaseCommand
from django.db import transaction
from django.utils import timezone

from django_graphql_playground.apps.core.models import Category
from django_graphql_playground.apps.pubsub.apps import PubsubConfig
from django_graphql_playground.apps.pubsub.services import producer
from django_graphql_playground.settings import STOMP_SERVER_HOST
from django_graphql_playground.settings import STOMP_SERVER_PORT
from django_graphql_playground.settings import TARGET_DESTINATION

logger = logging.getLogger(__name__)

connection_params = {"host": STOMP_SERVER_HOST, "port": STOMP_SERVER_PORT, "client_id": PubsubConfig.name}
capitol_publisher = producer.build_publisher(TARGET_DESTINATION, **connection_params)


class Command(BaseCommand):
help = "Start App producer"

def handle(self, *args, **options):
logger.info("Getting all categories to publish them...")
categories = Category.objects.filter(end_at__lt=timezone.now(), distributed_at__isnull=True).values()

logger.info(f"There are {categories.count()} categories to be sent")

correlation_id = uuid.uuid4()
logger.info(f"Correlation ID created: {correlation_id}")
standard_header = {"correlation-id": correlation_id}

with transaction.atomic():
with producer.do_inside_transaction(capitol_publisher):
for category in categories:
capitol_publisher.send(category, standard_header)
categories.update(distributed_at=timezone.now())
94 changes: 94 additions & 0 deletions django_graphql_playground/apps/pubsub/services/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import json
import logging
import time
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Callable
from typing import Dict

import stomp
from stomp.connect import StompConnection11

logger = logging.getLogger(__name__)


class Acknowledgements(Enum):
"""
See more details at:
- https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE_ack_Header
- https://jasonrbriggs.github.io/stomp.py/api.html#acks-and-nacks
"""

CLIENT = "client"
CLIENT_INDIVIDUAL = "client-individual"
AUTO = "auto"


@dataclass(frozen=True)
class Payload:
ack: Callable
nack: Callable
headers: Dict
body: Dict


class _Listener(stomp.ConnectionListener):
def __init__(
self,
connection: StompConnection11,
callback: callable,
subscription_configuration: Dict,
connection_configuration: Dict,
) -> None:
self._subscription_configuration = subscription_configuration
self._connection_configuration = connection_configuration
self._connection = connection
self._callback = callback
self._subscription_id = str(uuid.uuid4())
self._listener_id = str(uuid.uuid4())

def on_message(self, headers, message):
message_id = headers["message-id"]
logger.info(f"Message ID: {message_id}")
logger.debug("Received headers: %s", headers)
logger.debug("Received message: %s", message)

# https://jasonrbriggs.github.io/stomp.py/api.html#acks-and-nacks
def ack_logic():
self._connection.ack(message_id, self._subscription_id)

def nack_logic():
self._connection.nack(message_id, self._subscription_id)

self._callback(Payload(ack_logic, nack_logic, headers, json.loads(message)))

def is_open(self):
return self._connection.is_connected()

def start(self):
logger.info(f"Starting listener with name: {self._listener_id}")
logger.info(f"Subscribe/Listener auto-generated ID: {self._subscription_id}")

self._connection.set_listener(self._listener_id, self)
self._connection.start()
self._connection.connect(**self._connection_configuration)
self._connection.subscribe(id=self._subscription_id, **self._subscription_configuration)
while True:
# https://stackoverflow.com/a/529052/3899136
time.sleep(1)

def close(self):
self._connection.disconnect()


def build_listener(destination_name, callback, ack_type=Acknowledgements.CLIENT, **connection_params) -> _Listener:
logger.debug("Building listener for %s...", destination_name)
hosts = [(connection_params.get("host"), connection_params.get("port"))]
# http://stomp.github.io/stomp-specification-1.2.html#Heart-beating
conn = stomp.Connection(hosts, heartbeats=(10000, 10000))
client_id = connection_params.get("client_id", uuid.uuid4())
subscription_configuration = {"destination": destination_name, "ack": ack_type.value}
connection_configuration = {"wait": True, "headers": {"client-id": f"{client_id}-listener"}}
listener = _Listener(conn, callback, subscription_configuration, connection_configuration)
return listener
77 changes: 77 additions & 0 deletions django_graphql_playground/apps/pubsub/services/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import json
import logging
import uuid
from contextlib import contextmanager
from typing import Dict

import stomp
from stomp.connect import StompConnection11

logger = logging.getLogger(__name__)


class _Publisher:
def __init__(self, connection: StompConnection11, connection_configuration: Dict, destination_name: str) -> None:
self._connection_configuration = connection_configuration
self._connection = connection
self._destination_name = destination_name
self._default_content_type = "application/json;charset=utf-8"

def is_open(self):
return self._connection.is_connected()

def start(self):
self._connection.start()
self._connection.connect(**self._connection_configuration)

def close(self):
self._connection.disconnect()

def send(self, body, headers=None):
if hasattr(self, "_tmp_transaction_id"):
self._connection.send(
self._destination_name,
body=json.dumps(body),
content_type=self._default_content_type,
headers=headers,
transaction=self._tmp_transaction_id,
)
else:
self._connection.send(
self._destination_name, body=json.dumps(body), content_type=self._default_content_type, headers=headers
)


def build_publisher(destination_name, **connection_params) -> _Publisher:
logger.debug("Building publisher for %s...", destination_name)
hosts = [(connection_params.get("host"), connection_params.get("port"))]
client_id = connection_params.get("client_id", uuid.uuid4())
connection_configuration = {"wait": True, "headers": {"client-id": f"{client_id}-publisher"}}
conn = stomp.Connection(hosts)
publisher = _Publisher(conn, connection_configuration, destination_name)
return publisher


@contextmanager
def auto_open_close_connection(publisher: _Publisher):
try:
publisher.start()
yield
finally:
publisher.close()


@contextmanager
def do_inside_transaction(publisher: _Publisher):
with auto_open_close_connection(publisher):
try:
transaction_id = publisher._connection.begin()
setattr(publisher, "_tmp_transaction_id", transaction_id)
yield
publisher._connection.commit(getattr(publisher, "_tmp_transaction_id"))
except:
logger.exception("Could not conclude transaction properly")
publisher._connection.abort(getattr(publisher, "_tmp_transaction_id"))
finally:
if hasattr(publisher, "_tmp_transaction_id"):
delattr(publisher, "_tmp_transaction_id")
8 changes: 6 additions & 2 deletions django_graphql_playground/settings.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import os
from enum import Enum

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/2.1/howto/deployment/checklist/

Expand Down Expand Up @@ -43,6 +41,7 @@
"graphene_django",
"django_graphql_playground.apps.gqyl",
"rest_framework.authtoken",
"django_graphql_playground.apps.pubsub",
]

MIDDLEWARE = [
Expand Down Expand Up @@ -150,3 +149,8 @@

Enviroments = Enum("Level", "DEV QA HOM STAGING PROD")
ENVIRONMENT = os.getenv("ENVIRONMENT", Enviroments.DEV.name)

STOMP_SERVER_HOST = os.getenv("STOMP_SERVER_HOST")
STOMP_SERVER_PORT = os.getenv("STOMP_SERVER_PORT")
MY_DESTINATION = os.getenv("MY_DESTINATION")
TARGET_DESTINATION = os.getenv("TARGET_DESTINATION")
33 changes: 33 additions & 0 deletions django_graphql_playground/support/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging.config
import threading
from contextlib import contextmanager
from uuid import uuid4

local_threading = threading.local()


class ContextFilter(logging.Filter):
"""
This is a filter which injects contextual information into the log.
"""

def filter(self, record):
record.correlation_id = getattr(local_threading, "correlation_id", None)
return True


def _generated_correlation_id(custom_correlation_id=None) -> None:
local_threading.correlation_id = str(uuid4()) if custom_correlation_id is None else custom_correlation_id


def _unset_correlation_id() -> None:
del local_threading.correlation_id


@contextmanager
def do_log_with_correlation_id(custom_correlation_id=None):
try:
_generated_correlation_id(custom_correlation_id)
yield
finally:
_unset_correlation_id()

0 comments on commit f6272b3

Please sign in to comment.