Skip to content

Commit

Permalink
optimize the callback receiver to buffer writes on high throughput
Browse files Browse the repository at this point in the history
additionaly, optimize away several per-event host lookups and
changed/failed propagation lookups

we've always performed these (fairly expensive) queries *on every event
save* - if you're processing tens of thousands of events in short
bursts, this is way too slow

this commit also introduces a new command for profiling the insertion
rate of events, `awx-manage callback_stats`

see: #5514
  • Loading branch information
ryanpetrello committed Jan 14, 2020
1 parent 4112b20 commit 7aff1ee
Show file tree
Hide file tree
Showing 17 changed files with 374 additions and 498 deletions.
100 changes: 0 additions & 100 deletions awx/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3874,26 +3874,6 @@ def to_representation(self, obj):
return data


class JobEventWebSocketSerializer(JobEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()

class Meta:
model = JobEvent
fields = ('*', 'event_name', 'group_name',)

def get_created(self, obj):
return obj.created.isoformat()

def get_modified(self, obj):
return obj.modified.isoformat()

def get_group_name(self, obj):
return 'job_events'


class ProjectUpdateEventSerializer(JobEventSerializer):
stdout = serializers.SerializerMethodField()
event_data = serializers.SerializerMethodField()
Expand Down Expand Up @@ -3925,26 +3905,6 @@ def get_event_data(self, obj):
return {}


class ProjectUpdateEventWebSocketSerializer(ProjectUpdateEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()

class Meta:
model = ProjectUpdateEvent
fields = ('*', 'event_name', 'group_name',)

def get_created(self, obj):
return obj.created.isoformat()

def get_modified(self, obj):
return obj.modified.isoformat()

def get_group_name(self, obj):
return 'project_update_events'


class AdHocCommandEventSerializer(BaseSerializer):

event_display = serializers.CharField(source='get_event_display', read_only=True)
Expand Down Expand Up @@ -3976,26 +3936,6 @@ def to_representation(self, obj):
return data


class AdHocCommandEventWebSocketSerializer(AdHocCommandEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()

class Meta:
model = AdHocCommandEvent
fields = ('*', 'event_name', 'group_name',)

def get_created(self, obj):
return obj.created.isoformat()

def get_modified(self, obj):
return obj.modified.isoformat()

def get_group_name(self, obj):
return 'ad_hoc_command_events'


class InventoryUpdateEventSerializer(AdHocCommandEventSerializer):

class Meta:
Expand All @@ -4011,26 +3951,6 @@ def get_related(self, obj):
return res


class InventoryUpdateEventWebSocketSerializer(InventoryUpdateEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()

class Meta:
model = InventoryUpdateEvent
fields = ('*', 'event_name', 'group_name',)

def get_created(self, obj):
return obj.created.isoformat()

def get_modified(self, obj):
return obj.modified.isoformat()

def get_group_name(self, obj):
return 'inventory_update_events'


class SystemJobEventSerializer(AdHocCommandEventSerializer):

class Meta:
Expand All @@ -4046,26 +3966,6 @@ def get_related(self, obj):
return res


class SystemJobEventWebSocketSerializer(SystemJobEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()

class Meta:
model = SystemJobEvent
fields = ('*', 'event_name', 'group_name',)

def get_created(self, obj):
return obj.created.isoformat()

def get_modified(self, obj):
return obj.modified.isoformat()

def get_group_name(self, obj):
return 'system_job_events'


class JobLaunchSerializer(BaseSerializer):

# Representational fields
Expand Down
16 changes: 16 additions & 0 deletions awx/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from awx.conf.models import Setting
from awx.conf.migrations._reencrypt import decrypt_field as old_decrypt_field

import cachetools

# FIXME: Gracefully handle when settings are accessed before the database is
# ready (or during migrations).

Expand Down Expand Up @@ -136,6 +138,14 @@ def filter_sensitive(registry, key, value):
return value


# settings.__getattr__ is called *constantly*, and the LOG_AGGREGATOR_ ones are
# so ubiquitous when external logging is enabled that they should kept in memory
# with a short TTL to avoid even having to contact memcached
# the primary use case for this optimization is the callback receiver
# when external logging is enabled
LOGGING_SETTINGS_CACHE = cachetools.TTLCache(maxsize=50, ttl=1)


class EncryptedCacheProxy(object):

def __init__(self, cache, registry, encrypter=None, decrypter=None):
Expand Down Expand Up @@ -437,11 +447,17 @@ def SETTINGS_MODULE(self):
return self._get_default('SETTINGS_MODULE')

def __getattr__(self, name):
if name.startswith('LOG_AGGREGATOR_'):
cached = LOGGING_SETTINGS_CACHE.get(name)
if cached:
return cached
value = empty
if name in self.all_supported_settings:
with _ctit_db_wrapper(trans_safe=True):
value = self._get_local(name)
if value is not empty:
if name.startswith('LOG_AGGREGATOR_'):
LOGGING_SETTINGS_CACHE[name] = value
return value
value = self._get_default(name)
# sometimes users specify RabbitMQ passwords that contain
Expand Down
2 changes: 1 addition & 1 deletion awx/main/dispatch/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def write(self, preferred_queue, body):
logger.warn("could not write to queue %s" % preferred_queue)
logger.warn("detail: {}".format(tb))
write_attempt_order.append(preferred_queue)
logger.warn("could not write payload to any queue, attempted order: {}".format(write_attempt_order))
logger.error("could not write payload to any queue, attempted order: {}".format(write_attempt_order))
return None

def stop(self, signum):
Expand Down
5 changes: 4 additions & 1 deletion awx/main/dispatch/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ def stop(self, signum, frame):

class BaseWorker(object):

def read(self, queue):
return queue.get(block=True, timeout=1)

def work_loop(self, queue, finished, idx, *args):
ppid = os.getppid()
signal_handler = WorkerSignalHandler()
Expand All @@ -128,7 +131,7 @@ def work_loop(self, queue, finished, idx, *args):
if os.getppid() != ppid:
break
try:
body = queue.get(block=True, timeout=1)
body = self.read(queue)
if body == 'QUIT':
break
except QueueEmpty:
Expand Down
Loading

0 comments on commit 7aff1ee

Please sign in to comment.