Skip to content

Commit

Permalink
optimize per-event host lookups and changed/failed propagation lookups
Browse files Browse the repository at this point in the history
we've always performed these (fairly expensive) queries *on every event
save* - if you're processing tens of thousands of events in short
bursts, this is way too slow

see: #5514
  • Loading branch information
ryanpetrello committed Jan 9, 2020
1 parent afb922f commit eed7201
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 79 deletions.
8 changes: 6 additions & 2 deletions awx/main/dispatch/worker/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,12 @@ def perform_work(self, body):
retries = 0
while retries <= self.MAX_RETRIES:
try:
event = cls(**cls.create_from_data(**body))
event.remove_me()
kwargs = cls.create_from_data(**body)
workflow_job_id = kwargs.pop('workflow_job_id', None)
event = cls(**kwargs)
if workflow_job_id:
setattr(event, 'workflow_job_id', workflow_job_id)
event._update_from_event_data()
self.buff.setdefault(cls, []).append(event)
self.flush()
break
Expand Down
104 changes: 32 additions & 72 deletions awx/main/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class BasePlaybookEvent(CreatedModifiedModel):
VALID_KEYS = [
'event', 'event_data', 'playbook', 'play', 'role', 'task', 'created',
'counter', 'uuid', 'stdout', 'parent_uuid', 'start_line', 'end_line',
'verbosity'
'host_id', 'host_name', 'verbosity',
]

class Meta:
Expand Down Expand Up @@ -288,10 +288,35 @@ def _update_from_event_data(self):
self.changed = bool(sum(changed_dict.values()))
except (AttributeError, TypeError):
pass

if isinstance(self, JobEvent):
hostnames = self._hostnames()
self._update_host_summary_from_stats(hostnames)
try:
self.job.inventory.update_computed_fields()
except DatabaseError:
logger.exception('Computed fields database error saving event {}'.format(self.pk))

# find parent links and progagate changed=T and failed=T
changed = self.job.job_events.filter(changed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa
failed = self.job.job_events.filter(failed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa

JobEvent.objects.filter(
job_id=self.job_id, uuid__in=changed
).update(changed=True)
JobEvent.objects.filter(
job_id=self.job_id, uuid__in=failed
).update(failed=True)

for field in ('playbook', 'play', 'task', 'role'):
value = force_text(event_data.get(field, '')).strip()
if value != getattr(self, field):
setattr(self, field, value)
if isinstance(self, JobEvent):
analytics_logger.info(
'Event data saved.',
extra=dict(python_objects=dict(job_event=self))
)

@classmethod
def create_from_data(cls, **kwargs):
Expand All @@ -317,58 +342,12 @@ def create_from_data(cls, **kwargs):
kwargs.pop('created', None)

sanitize_event_keys(kwargs, cls.VALID_KEYS)
workflow_job_id = kwargs.pop('workflow_job_id', None)
return kwargs
job_event = cls.objects.create(**kwargs)
if workflow_job_id:
setattr(job_event, 'workflow_job_id', workflow_job_id)
analytics_logger.info('Event data saved.', extra=dict(python_objects=dict(job_event=job_event)))
return job_event

@property
def job_verbosity(self):
return 0

def remove_me(self, *args, **kwargs):
# Update model fields from event data.
self._update_from_event_data()
# Update host related field from host_name.
if hasattr(self, 'job') and not self.host_id and self.host_name:
if self.job.inventory.kind == 'smart':
# optimization to avoid calling inventory.hosts, which
# can take a long time to run under some circumstances
from awx.main.models.inventory import SmartInventoryMembership
membership = SmartInventoryMembership.objects.filter(
inventory=self.job.inventory, host__name=self.host_name
).first()
if membership:
host_id = membership.host_id
else:
host_id = None
else:
host_qs = self.job.inventory.hosts.filter(name=self.host_name)
host_id = host_qs.only('id').values_list('id', flat=True).first()
if host_id != self.host_id:
self.host_id = host_id

if self.parent_uuid:
kwargs = {}
if self.changed is True:
kwargs['changed'] = True
if self.failed is True:
kwargs['failed'] = True
if kwargs:
JobEvent.objects.filter(job_id=self.job_id, uuid=self.parent_uuid).update(**kwargs)

if self.event == 'playbook_on_stats':
hostnames = self._hostnames()
self._update_host_summary_from_stats(hostnames)
try:
self.job.inventory.update_computed_fields()
except DatabaseError:
logger.exception('Computed fields database error saving event {}'.format(self.pk))



class JobEvent(BasePlaybookEvent):
'''
Expand Down Expand Up @@ -432,13 +411,6 @@ def get_absolute_url(self, request=None):
def __str__(self):
return u'%s @ %s' % (self.get_event_display2(), self.created.isoformat())

def _update_from_event_data(self):
# Update job event hostname
super(JobEvent, self)._update_from_event_data()
value = force_text(self.event_data.get('host', '')).strip()
if value != getattr(self, 'host_name'):
setattr(self, 'host_name', value)

def _hostnames(self):
hostnames = set()
try:
Expand Down Expand Up @@ -570,14 +542,7 @@ def create_from_data(cls, **kwargs):
kwargs.pop('created', None)

sanitize_event_keys(kwargs, cls.VALID_KEYS)
kwargs.pop('workflow_job_id', None)
event = cls.objects.create(**kwargs)
if isinstance(event, AdHocCommandEvent):
analytics_logger.info(
'Event data saved.',
extra=dict(python_objects=dict(job_event=event))
)
return event
return kwargs

def get_event_display(self):
'''
Expand Down Expand Up @@ -670,22 +635,17 @@ class Meta:
def get_absolute_url(self, request=None):
return reverse('api:ad_hoc_command_event_detail', kwargs={'pk': self.pk}, request=request)

def remove_me(self, *args, **kwargs):
def _update_from_event_data(self):
res = self.event_data.get('res', None)
if self.event in self.FAILED_EVENTS:
if not self.event_data.get('ignore_errors', False):
self.failed = True
if isinstance(res, dict) and res.get('changed', False):
self.changed = True
self.host_name = self.event_data.get('host', '').strip()
if not self.host_id and self.host_name:
host_qs = self.ad_hoc_command.inventory.hosts.filter(name=self.host_name)
try:
host_id = host_qs.only('id').values_list('id', flat=True)
if host_id.exists():
self.host_id = host_id[0]
except (IndexError, AttributeError):
pass
analytics_logger.info(
'Event data saved.',
extra=dict(python_objects=dict(job_event=self))
)


class InventoryUpdateEvent(BaseCommandEvent):
Expand Down
8 changes: 4 additions & 4 deletions awx/main/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ def emit_event_detail(cls, relation, **kwargs):
'event': instance.event,
'failed': instance.failed,
'changed': instance.changed,
'event_level': instance.event_level,
'play': instance.play,
'role': instance.role,
'task': instance.task
'event_level': getattr(instance, 'event_level', ''),
'play': getattr(instance, 'play', ''),
'role': getattr(instance, 'role', ''),
'task': getattr(instance, 'task', ''),
}
)

Expand Down
17 changes: 16 additions & 1 deletion awx/main/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ class BaseTask(object):
def __init__(self):
self.cleanup_paths = []
self.parent_workflow_job_id = None
self.host_map = {}

def update_model(self, pk, _attempt=0, **updates):
"""Reload the model instance from the database and update the
Expand Down Expand Up @@ -1001,11 +1002,17 @@ def should_use_proot(self, instance):
return False

def build_inventory(self, instance, private_data_dir):
script_params = dict(hostvars=True)
script_params = dict(hostvars=True, towervars=True)
if hasattr(instance, 'job_slice_number'):
script_params['slice_number'] = instance.job_slice_number
script_params['slice_count'] = instance.job_slice_count
script_data = instance.inventory.get_script_data(**script_params)
# maintain a list of host_name --> host_id
# so we can associate emitted events to Host objects
self.host_map = {
hostname: hv['remote_tower_id']
for hostname, hv in script_data.get('_meta', {}).get('hostvars', {}).items()
}
json_data = json.dumps(script_data)
handle, path = tempfile.mkstemp(dir=private_data_dir)
f = os.fdopen(handle, 'w')
Expand Down Expand Up @@ -1114,6 +1121,14 @@ def event_handler(self, event_data):
event_data.pop('parent_uuid', None)
if self.parent_workflow_job_id:
event_data['workflow_job_id'] = self.parent_workflow_job_id
if self.host_map:
host = event_data.get('event_data', {}).get('host', '').strip()
if host:
event_data['host_name'] = host
event_data['host_id'] = self.host_map[host]
else:
event_data['host_name'] = ''
event_data['host_id'] = ''
should_write_event = False
event_data.setdefault(self.event_data_key, self.instance.id)
self.dispatcher.dispatch(event_data)
Expand Down

0 comments on commit eed7201

Please sign in to comment.