Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace rabbitmq with redis #6034

Merged
merged 41 commits into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
c8eeaca
POC channels 2
chrismeyersfsu Nov 8, 2019
355fb12
redis events
chrismeyersfsu Jan 7, 2020
558e928
POC postgres broker
chrismeyersfsu Dec 20, 2019
2a2c34f
combine all the broker replacement pieces
chrismeyersfsu Jan 8, 2020
3fec697
fix websocket job subscription access control
chrismeyersfsu Jan 13, 2020
50b56aa
autobahn 20.1.2 released an hour ago
chrismeyersfsu Jan 14, 2020
dc6c353
remove support for multi-reader dispatch queue
chrismeyersfsu Jan 16, 2020
5818dcc
prefer simple async -> sync
chrismeyersfsu Jan 22, 2020
0883739
satisfy generic Role code
chrismeyersfsu Jan 23, 2020
feac93f
add websocket group unsubscribe reply
chrismeyersfsu Jan 24, 2020
3f2d757
update awxkit to use new unsubscribe event
chrismeyersfsu Jan 24, 2020
ea29f4b
account for isolated job status
chrismeyersfsu Jan 27, 2020
403e9bb
add websocket health information
chrismeyersfsu Feb 5, 2020
be58906
remove kombu
chrismeyersfsu Feb 10, 2020
e94bb44
replace rabbitmq with redis
chrismeyersfsu Feb 11, 2020
45ce6d7
Initial migration of rabbitmq -> redis for k8s installs
shanemcd Feb 13, 2020
c06b630
remove health info
chrismeyersfsu Feb 14, 2020
03b7302
websockets aware of Instance changes
chrismeyersfsu Feb 14, 2020
f5193e5
resolve rebase errors
chrismeyersfsu Feb 14, 2020
3c5c9c6
move broadcast websocket out into its own process
chrismeyersfsu Feb 14, 2020
14320bc
handle websocket unsubscribe
chrismeyersfsu Feb 17, 2020
3b9e67e
remove channel group model
chrismeyersfsu Feb 17, 2020
0da94ad
add missing service name to dev env
chrismeyersfsu Feb 21, 2020
b6b9802
increase per-channel capacity
chrismeyersfsu Feb 21, 2020
d6594ab
add broadcast websocket metrics
chrismeyersfsu Feb 21, 2020
8350bb3
robust broadcast websocket error hanndling
chrismeyersfsu Feb 21, 2020
e25bd93
change dispatcher test to make required queue
chrismeyersfsu Feb 21, 2020
093d204
fix flake8
chrismeyersfsu Feb 21, 2020
9e5fe7f
translate Instance hostname to safe analytics name
chrismeyersfsu Feb 23, 2020
2b59af3
safely operate in async or sync context
chrismeyersfsu Feb 26, 2020
3f5e2a3
try to make openshift build happy
chrismeyersfsu Feb 27, 2020
d58df0f
fix sliding window calculation
chrismeyersfsu Feb 28, 2020
770b457
redis socket support
chrismeyersfsu Mar 5, 2020
1caa2e0
work around a limitation in postgres notify to properly support copying
ryanpetrello Mar 2, 2020
b58c71b
remove broadcast websockets view
chrismeyersfsu Mar 16, 2020
59c9de2
awxkit python2.7 compatible print
chrismeyersfsu Mar 17, 2020
89163f2
remove redis broker url test
chrismeyersfsu Mar 17, 2020
18f5dd6
add websocket backplane documentation
chrismeyersfsu Mar 18, 2020
87de0cf
flake8, pytest, license fixes
chrismeyersfsu Mar 18, 2020
7c3cbe6
add a license for redis-cli
ryanpetrello Mar 18, 2020
f1ee963
fix up rebased migrations
ryanpetrello Mar 18, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 2 additions & 31 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -265,28 +265,6 @@ migrate:
dbchange:
$(MANAGEMENT_COMMAND) makemigrations

server_noattach:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you make sure nobody on the eng team is still using this target? I wouldn't think so, given that we're now running everything in supervisord, which is much more flexible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I still use this occasionally :(

tmux new-session -d -s awx 'exec make uwsgi'
tmux rename-window 'AWX'
tmux select-window -t awx:0
tmux split-window -v 'exec make dispatcher'
tmux new-window 'exec make daphne'
tmux select-window -t awx:1
tmux rename-window 'WebSockets'
tmux split-window -h 'exec make runworker'
tmux split-window -v 'exec make nginx'
tmux new-window 'exec make receiver'
tmux select-window -t awx:2
tmux rename-window 'Extra Services'
tmux select-window -t awx:0

server: server_noattach
tmux -2 attach-session -t awx

# Use with iterm2's native tmux protocol support
servercc: server_noattach
tmux -2 -CC attach-session -t awx

supervisor:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
Expand All @@ -311,18 +289,11 @@ daphne:
fi; \
daphne -b 127.0.0.1 -p 8051 awx.asgi:channel_layer

runworker:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(PYTHON) manage.py runworker --only-channels websocket.*
chrismeyersfsu marked this conversation as resolved.
Show resolved Hide resolved

# Run the built-in development webserver (by default on http://localhost:8013).
runserver:
wsbroadcast:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(PYTHON) manage.py runserver
$(PYTHON) manage.py run_wsbroadcast

# Run to start the background task dispatcher for development.
dispatcher:
Expand Down
9 changes: 8 additions & 1 deletion awx/api/generics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import inspect
import logging
import time
import uuid
import urllib.parse

# Django
from django.conf import settings
from django.core.cache import cache
from django.db import connection
from django.db.models.fields import FieldDoesNotExist
from django.db.models.fields.related import OneToOneRel
Expand Down Expand Up @@ -973,14 +975,19 @@ def post(self, request, *args, **kwargs):
if hasattr(new_obj, 'admin_role') and request.user not in new_obj.admin_role.members.all():
new_obj.admin_role.members.add(request.user)
if sub_objs:
# store the copied object dict into memcached, because it's
# often too large for postgres' notification bus
# (which has a default maximum message size of 8k)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that not a problem for other stuff, like job events?

and how does this work on a cluster where a different instance may be performing the deep copy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This limitation is specific to only dispatcher tasks (which use postgres listen/notify).

Job events use redis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and how does this work on a cluster where a different instance may be performing the deep copy?

see: https://github.com/ansible/awx/pull/6034/files#diff-9d4ea1dd908b35fb92eaede4bd10bb46R2856

We made it so that this task is routed to dispatchers on the same instance as uwsgi (they don't get distributed to other nodes).

key = 'deep-copy-{}'.format(str(uuid.uuid4()))
cache.set(key, sub_objs, timeout=3600)
permission_check_func = None
if hasattr(type(self), 'deep_copy_permission_check_func'):
permission_check_func = (
type(self).__module__, type(self).__name__, 'deep_copy_permission_check_func'
)
trigger_delayed_deep_copy(
self.model.__module__, self.model.__name__,
obj.pk, new_obj.pk, request.user.pk, sub_objs,
obj.pk, new_obj.pk, request.user.pk, key,
permission_check_func=permission_check_func
)
serializer = self._get_copy_return_serializer(new_obj)
Expand Down
4 changes: 3 additions & 1 deletion awx/api/urls/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
OAuth2ApplicationDetail,
)

from awx.api.views.metrics import MetricsView
from awx.api.views.metrics import (
MetricsView,
)

from .organization import urls as organization_urls
from .user import urls as user_urls
Expand Down
1 change: 1 addition & 0 deletions awx/api/views/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ def get(self, request):
if (request.user.is_superuser or request.user.is_system_auditor):
return Response(metrics().decode('UTF-8'))
raise PermissionDenied()

10 changes: 5 additions & 5 deletions awx/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
# All Rights Reserved.
import os
import logging
import django
from awx import __version__ as tower_version

# Prepare the AWX environment.
from awx import prepare_env, MODE
prepare_env() # NOQA

from django.core.wsgi import get_wsgi_application # NOQA
from channels.asgi import get_channel_layer
from channels.routing import get_default_application


"""
ASGI config for AWX project.
Expand All @@ -32,6 +33,5 @@


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "awx.settings")


channel_layer = get_channel_layer()
django.setup()
channel_layer = get_default_application()
166 changes: 166 additions & 0 deletions awx/main/analytics/broadcast_websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import datetime
import asyncio
import logging
import aioredis
import redis
import re

from prometheus_client import (
generate_latest,
Gauge,
Counter,
Enum,
CollectorRegistry,
)

from django.conf import settings


BROADCAST_WEBSOCKET_REDIS_KEY_NAME = 'broadcast_websocket_stats'


logger = logging.getLogger('awx.main.analytics.broadcast_websocket')


def dt_to_seconds(dt):
return int((dt - datetime.datetime(1970,1,1)).total_seconds())


def now_seconds():
return dt_to_seconds(datetime.datetime.now())


# Second granularity; Per-minute
class FixedSlidingWindow():
def __init__(self, start_time=None):
self.buckets = dict()
self.start_time = start_time or now_seconds()

def cleanup(self, now_bucket=None):
now_bucket = now_bucket or now_seconds()
if self.start_time + 60 <= now_bucket:
self.start_time = now_bucket + 60 + 1

# Delete old entries
for k in list(self.buckets.keys()):
if k < self.start_time:
del self.buckets[k]

def record(self, ts=None):
ts = ts or datetime.datetime.now()
now_bucket = int((ts - datetime.datetime(1970,1,1)).total_seconds())

val = self.buckets.get(now_bucket, 0)
self.buckets[now_bucket] = val + 1

self.cleanup(now_bucket)

def render(self):
self.cleanup()
return sum(self.buckets.values()) or 0


class BroadcastWebsocketStatsManager():
def __init__(self, event_loop, local_hostname):
self._local_hostname = local_hostname

self._event_loop = event_loop
self._stats = dict()
self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME

def new_remote_host_stats(self, remote_hostname):
self._stats[remote_hostname] = BroadcastWebsocketStats(self._local_hostname,
remote_hostname)
return self._stats[remote_hostname]

def delete_remote_host_stats(self, remote_hostname):
del self._stats[remote_hostname]

async def run_loop(self):
try:
redis_conn = await aioredis.create_redis_pool(settings.BROKER_URL)
while True:
stats_data_str = ''.join(stat.serialize() for stat in self._stats.values())
await redis_conn.set(self._redis_key, stats_data_str)

await asyncio.sleep(settings.BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS)
except Exception as e:
logger.warn(e)
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS)
self.start()

def start(self):
self.async_task = self._event_loop.create_task(self.run_loop())
return self.async_task

@classmethod
def get_stats_sync(cls):
'''
Stringified verion of all the stats
'''
redis_conn = redis.Redis.from_url(settings.BROKER_URL)
return redis_conn.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME)


class BroadcastWebsocketStats():
def __init__(self, local_hostname, remote_hostname):
self._local_hostname = local_hostname
self._remote_hostname = remote_hostname
self._registry = CollectorRegistry()

# TODO: More robust replacement
self.name = self.safe_name(self._local_hostname)
self.remote_name = self.safe_name(self._remote_hostname)

self._messages_received_total = Counter(f'awx_{self.remote_name}_messages_received_total',
'Number of messages received, to be forwarded, by the broadcast websocket system',
registry=self._registry)
self._messages_received = Gauge(f'awx_{self.remote_name}_messages_received',
'Number forwarded messages received by the broadcast websocket system, for the duration of the current connection',
registry=self._registry)
self._connection = Enum(f'awx_{self.remote_name}_connection',
'Websocket broadcast connection',
states=['disconnected', 'connected'],
registry=self._registry)
self._connection_start = Gauge(f'awx_{self.remote_name}_connection_start',
'Time the connection was established',
registry=self._registry)

self._messages_received_per_minute = Gauge(f'awx_{self.remote_name}_messages_received_per_minute',
'Messages received per minute',
registry=self._registry)
self._internal_messages_received_per_minute = FixedSlidingWindow()

def safe_name(self, s):
# Replace all non alpha-numeric characters with _
return re.sub('[^0-9a-zA-Z]+', '_', s)

def unregister(self):
self._registry.unregister(f'awx_{self.remote_name}_messages_received')
self._registry.unregister(f'awx_{self.remote_name}_connection')

def record_message_received(self):
self._internal_messages_received_per_minute.record()
self._messages_received.inc()
self._messages_received_total.inc()

def record_connection_established(self):
self._connection.state('connected')
self._connection_start.set_to_current_time()
self._messages_received.set(0)

def record_connection_lost(self):
self._connection.state('disconnected')

def get_connection_duration(self):
return (datetime.datetime.now() - self._connection_established_ts).total_seconds()

def render(self):
msgs_per_min = self._internal_messages_received_per_minute.render()
self._messages_received_per_minute.set(msgs_per_min)

def serialize(self):
self.render()

registry_data = generate_latest(self._registry).decode('UTF-8')
return registry_data
Loading