-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
heavily optimize the write speed of the callback receiver #5618
heavily optimize the write speed of the callback receiver #5618
Conversation
awx/main/dispatch/worker/callback.py
Outdated
return 'FLUSH' | ||
|
||
def flush(self): | ||
should_flush = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main optimization is to avoid an insert per event, but instead buffer event saves using bulk_create
. The way this works is that we'll buffer up to 1000 events at once (though it's very doubtful that anything on the other end is publishing 4k events per second).
Additionally, in the work loop that pulls events over the IPC pipe, we time out every tenth of a second, and use a special FLUSH
message to signal that we should flush:
https://github.com/ansible/awx/pull/5618/files#diff-f37b92a11438678a6a32ac23a7790f05R39
So effectively, inserts will buffer for up to a tenth of a second. If there's a lot coming in, we're doing bulk inserts to cut down on individual insert + commit costs, and this is making a very notable performance difference for high-write workloads.
update_fields.append(field) | ||
|
||
# Update host related field from host_name. | ||
if hasattr(self, 'job') and not self.host_id and self.host_name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another very large expense is that on every event insert, we're doing some type of inventory/host query here to map a host name for the current job to a host ID.
This individual queries aren't slow, but this model means that if you INSERT
100K events, you're doing 100K SELECT
queries.
It turns out that this is completely unnecessary - because we compose the inventory ourselves, we can instead just pass the Tower Host ID through hostvars, and include it in the event payload before we even get to this code. That implementation lives here:
https://github.com/ansible/awx/pull/5618/files#diff-9d4ea1dd908b35fb92eaede4bd10bb46R1004
if self.failed is True: | ||
kwargs['failed'] = True | ||
if kwargs: | ||
JobEvent.objects.filter(job_id=self.job_id, uuid=self.parent_uuid).update(**kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the worst offenders. Much like the per-event hostname lookup, this query means that on every INSERT
where there's a parent_uuid
(many of them, depending on your playbook), we're also doing a per-event UPDATE
. Even worse, in situations where you've got many events that actually share the same parent (such as within a with_items
context), you're actually issuing the exact same query over and over. As you can imagine, this is really expensive if your main_jobevent
is particularly large.
I observed this exact issue recently while profiling slowness in a customer install.
Instead of handling this per-event, this behavior can be implemented much more efficiently once at the very end of the job run:
https://github.com/ansible/awx/pull/5618/files#diff-9b1e5b80fcb01fab3e7a0ece4e371a50R299
@@ -456,38 +409,6 @@ def get_absolute_url(self, request=None): | |||
def __str__(self): | |||
return u'%s @ %s' % (self.get_event_display2(), self.created.isoformat()) | |||
|
|||
def _update_from_event_data(self): | |||
# Update job event hostname |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We no longer need any of this code, because it's already in the event that goes across the message bus.
return updated_fields | ||
|
||
def _update_hosts(self, extra_host_pks=None): | ||
# Update job event hosts m2m from host_name, propagate to parent events. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method was dead code that nothing was calling. I can't see any evidence of its use since ~2014.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
# If update_fields has been specified, add our field names to it, | ||
# if it hasn't been specified, then we're just doing a normal save. | ||
update_fields = kwargs.get('update_fields', []) | ||
def _update_from_event_data(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bulk_create
doesn't actually call .save()
, so I've changed this method to be something we can call manually to update the Django ORM objects before they're passed to bulk_create
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy_ansible_runner_on_event_status_into_tower_runner_on_event_status
or some other, more descriptive function name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Organizationally, I see how it's a bit confusing, see discussion on host_name
elsewhere here. That field was promoted from the "Ansible" event data to the "runner" event data in the callback receiver before the message was sent. It would be better to have all these promotion tasks put into the same place ("copy" in your wording). But that organizational issue won't be helped by improved naming. So if we're not refactoring, I would leave it as-is.
Also, the logging here is a really important feature, and I would certainly like to separate that from the other actions, for testing and other things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, all of these methods and inheritance in models/events.py
are really starting to show how complex they've grown. I wouldn't mind refactoring it to clean it up some, but I'm also not interested in doing it right now.
@@ -271,34 +271,44 @@ def get_event_display2(self): | |||
|
|||
def _update_from_event_data(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as https://github.com/ansible/awx/pull/5618/files#r364842220
You'll notice I've removed the updated_fields
tracking because this method doesn't actually call .save()
anymore (it just updates properties of self
).
for e in events: | ||
e.created = e.modified = now | ||
cls.objects.bulk_create(events) | ||
for e in events: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bulk_create
doesn't fire signals, so we have to iterate and call post_save
ourselves so that websocket messages are emitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oof
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it's perfectly fast, to be honest (now that I've sped up the signal code to not use a DRF serializer).
instance = kwargs['instance'] | ||
created = kwargs['created'] | ||
if created: | ||
event_serializer = serializer(instance) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out that this serializer has a lot of (mostly Django and DRF) overhead, and we don't need or display the majority of the content it generates anyways (like related links and summary fields).
This change makes this code a bit more brittle, but this is a critically important section of our code from a performance perspective. In my profiling, this change cut down per-event costs by over 50% 😱.
awx/main/signals.py
Outdated
if isinstance(instance, JobEvent): | ||
url = '/api/v2/job_events/{}'.format(instance.id) | ||
if isinstance(instance, AdHocCommandEvent): | ||
url = '/api/v2/ad_hoc_command_events/{}'.format(instance.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the only two types of events with top-level URLs in the API.
Build failed.
|
@@ -128,7 +131,7 @@ def work_loop(self, queue, finished, idx, *args): | |||
if os.getppid() != ppid: | |||
break | |||
try: | |||
body = queue.get(block=True, timeout=1) | |||
body = self.read(queue) |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, looks like this change is just to accommodate the pattern in the callback-specific worker code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, this is just to persist the behavior for the base case (the dispatcher).
@@ -573,9 +573,6 @@ def IS_TESTING(argv=None): | |||
# Additional environment variables to be passed to the ansible subprocesses | |||
AWX_TASK_ENV = {} | |||
|
|||
# Flag to enable/disable updating hosts M2M when saving job events. | |||
CAPTURE_JOB_EVENT_HOSTS = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the original shortcut to not do this if someone needed to eek out performance gains.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it got turned off as default behavior years ago - 🤷♂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
447fdfa
to
eed7201
Compare
awx/main/dispatch/worker/callback.py
Outdated
try: | ||
return queue.get(block=True, timeout=.25) | ||
except QueueEmpty: | ||
return 'FLUSH' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there haven't been any new events from RMQ in .25 seconds, flush with bulk_create
.
awx/main/dispatch/worker/callback.py
Outdated
|
||
def flush(self): | ||
should_flush = ( | ||
any([len(events) == 2000 for events in self.buff.values()]) or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2000 is probably overkill - I doubt we'd see this many events (effectively, 8k/s) per worker process from a real Ansible playbook because playbooks that are actually doing real things don't actually emit events this fast.
awx/main/dispatch/worker/callback.py
Outdated
job_identifier = 'unknown job' | ||
job_key = 'unknown' | ||
for key in event_map.keys(): | ||
for key, cls in event_map.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've actually got to track a dictionary of buffers indexed by the event class. That way if we get:
JobEvent, JobEvent, ProjectUpdateEvent, InventoryUpdateEvent, JobEvent
.
When it's time to flush, we run:
JobEvent.objects.bulk_create([JE, JE JE])
ProjectUpdateEvent.objects.bulk_create([PUE])
InventoryUpdateEvent.objects.bulk_create([IUE])
awx/main/dispatch/worker/callback.py
Outdated
@@ -104,7 +133,14 @@ def _save_event_data(): | |||
retries = 0 | |||
while retries <= self.MAX_RETRIES: | |||
try: | |||
_save_event_data() | |||
kwargs = cls.create_from_data(**body) | |||
workflow_job_id = kwargs.pop('workflow_job_id', None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workflow_job_id
isn't an actual column, so we can't pass it to the event __init__()
.
Instead, we use setattr
on the instance so that message sent to external loggers contains it.
related: #4731
Build failed.
|
except (AttributeError, TypeError): | ||
pass | ||
|
||
if isinstance(self, JobEvent): | ||
hostnames = self._hostnames() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is all job-event specific processing that happens for playbook_on_stats
that I've moved here for consistency.
I tested this some by spinning up two Given that this is an 8 core box, I decided to set Next, I stopped the callback receiver and pushed 1.5M rows into the ~ awx-manage shell_plus
>>> from awx.main.queue import CallbackQueueDispatcher
>>> d = CallbackQueueDispatcher()
>>> import uuid
>>> for i in range(1500000):
... d.dispatch({'uuid': str(uuid.uuid4()), 'stdout': 'Line %d' % i,'job_id': 149}) (...and waited a few minutes) Next, I restarted the callback receiver and waited to see what throughput looked like. awx=> SELECT COUNT(*) FROM main_jobevent WHERE created > now() - '1 minute'::interval;
count
--------
109279
(1 row) So around 1800 events per second. I couldn't continue my experiment further because I ran out of disk space 😂. |
3c8c90f
to
dfd4801
Compare
Build succeeded.
|
dfd4801
to
7a06468
Compare
Build succeeded.
|
7a06468
to
052f6df
Compare
Build succeeded.
|
052f6df
to
d10432a
Compare
Build succeeded.
|
d10432a
to
b904165
Compare
Build succeeded.
|
b904165
to
553671b
Compare
Build succeeded.
|
553671b
to
7aff1ee
Compare
class Command(BaseCommand): | ||
|
||
def handle(self, *args, **options): | ||
super(Command, self).__init__() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any reason that handle
calls the super __init__
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, I think that's just a copy-paste typo. Good eye.
Build succeeded.
|
additionaly, optimize away several per-event host lookups and changed/failed propagation lookups we've always performed these (fairly expensive) queries *on every event save* - if you're processing tens of thousands of events in short bursts, this is way too slow this commit also introduces a new command for profiling the insertion rate of events, `awx-manage callback_stats` see: ansible#5514
7aff1ee
to
306f504
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only other thing I want to say is that I really want to have more things planned for the management command. But I have no problem with adding it at the moment, or having this as its default behavior.
Build succeeded.
|
Build succeeded (gate pipeline).
|
the callback receiver is still fairly slow when logging is enabled due to constant setting lookups; this speeds things up considerably related: ansible#5618
the callback receiver is still fairly slow when logging is enabled due to constant setting lookups; this speeds things up considerably related: ansible#5618
the callback receiver is still fairly slow when logging is enabled due to constant setting lookups; this speeds things up considerably related: ansible#5618
the callback receiver is still fairly slow when logging is enabled due to constant setting lookups; this speeds things up considerably related: ansible#5618
see: #5514
see: #5590
Below added by @chrismeyersfsu
Notes for QE: