Skip to content

Commit

Permalink
reduce per-job database query count
Browse files Browse the repository at this point in the history
* Do not query the database for the set of Instance that belong to the
group for which we are trying to fit a job on, for each job.
* Instead, cache the set of instances per-instance group.
  • Loading branch information
chrismeyersfsu committed Oct 15, 2020
1 parent 770206b commit 7b0ce7a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 37 deletions.
10 changes: 6 additions & 4 deletions awx/main/models/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,18 +261,20 @@ class Meta:
app_label = 'main'


def fit_task_to_most_remaining_capacity_instance(self, task):
@staticmethod
def fit_task_to_most_remaining_capacity_instance(task, instances):
instance_most_capacity = None
for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'):
for i in instances:
if i.remaining_capacity >= task.task_impact and \
(instance_most_capacity is None or
i.remaining_capacity > instance_most_capacity.remaining_capacity):
instance_most_capacity = i
return instance_most_capacity

def find_largest_idle_instance(self):
@staticmethod
def find_largest_idle_instance(instances):
largest_instance = None
for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'):
for i in instances:
if i.jobs_running == 0:
if largest_instance is None:
largest_instance = i
Expand Down
53 changes: 42 additions & 11 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid
import json
import random
from types import SimpleNamespace

# Django
from django.db import transaction, connection
Expand Down Expand Up @@ -45,17 +46,46 @@
class TaskManager():

def __init__(self):
'''
Do NOT put database queries or other potentially expensive operations
in the task manager init. The task manager object is created every time a
job is created, transitions state, and every 30 seconds on each tower node.
More often then not, the object is destroyed quickly because the NOOP case is hit.
The NOOP case is short-circuit logic. If the task manager realizes that another instance
of the task manager is already running, then it short-circuits and decides not to run.
'''
self.graph = dict()
# start task limit indicates how many pending jobs can be started on this
# .schedule() run. Starting jobs is expensive, and there is code in place to reap
# the task manager after 5 minutes. At scale, the task manager can easily take more than
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT

def after_lock_init(self):
'''
Init AFTER we know this instance of the task manager will run because the lock is acquired.
'''
instances = Instance.objects.filter(capacity__gt=0, enabled=True)
self.real_instances = {i.hostname: i for i in instances}

instances_partial = [SimpleNamespace(obj=instance,
remaining_capacity=instance.remaining_capacity,
capacity=instance.capacity,
jobs_running=instance.jobs_running,
hostname=instance.hostname) for instance in instances]

instances_by_hostname = {i.hostname: i for i in instances_partial}

for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
capacity_total=rampart_group.capacity,
consumed_capacity=0)
consumed_capacity=0,
instances=[])
for instance in rampart_group.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'):
if instance.hostname in instances_by_hostname:
self.graph[rampart_group.name]['instances'].append(instances_by_hostname[instance.hostname])

def is_job_blocked(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
Expand Down Expand Up @@ -466,7 +496,6 @@ def process_pending_tasks(self, pending_tasks):
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
idle_instance_that_fits = None
if isinstance(task, WorkflowJob):
if task.unified_job_template_id in running_workflow_templates:
if not task.allow_simultaneous:
Expand All @@ -483,24 +512,23 @@ def process_pending_tasks(self, pending_tasks):
found_acceptable_queue = True
break

if idle_instance_that_fits is None:
idle_instance_that_fits = rampart_group.find_largest_idle_instance()
remaining_capacity = self.get_remaining_capacity(rampart_group.name)
if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(
rampart_group.name, remaining_capacity))
continue

execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task)
if execution_instance:
logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format(
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity))
elif not execution_instance and idle_instance_that_fits:
execution_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance(task, self.graph[rampart_group.name]['instances']) or \
InstanceGroup.find_largest_idle_instance(self.graph[rampart_group.name]['instances'])

if execution_instance or rampart_group.is_containerized:
if not rampart_group.is_containerized:
execution_instance = idle_instance_that_fits
execution_instance.remaining_capacity = max(0, execution_instance.remaining_capacity - task.task_impact)
execution_instance.jobs_running += 1
logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format(
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity))
if execution_instance or rampart_group.is_containerized:

execution_instance = self.real_instances[execution_instance.hostname]
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
Expand Down Expand Up @@ -572,6 +600,9 @@ def process_tasks(self, all_sorted_tasks):
def _schedule(self):
finished_wfjs = []
all_sorted_tasks = self.get_tasks()

self.after_lock_init()

if len(all_sorted_tasks) > 0:
# TODO: Deal with
# latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
Expand Down
35 changes: 13 additions & 22 deletions awx/main/tests/unit/models/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,14 @@ class TestInstanceGroup(object):
(T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"),
])
def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason):
with mock.patch.object(InstanceGroup,
'instances',
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=lambda x: instances))):
ig = InstanceGroup(id=10)
ig = InstanceGroup(id=10)

if instance_fit_index is None:
assert ig.fit_task_to_most_remaining_capacity_instance(task) is None, reason
else:
assert ig.fit_task_to_most_remaining_capacity_instance(task) == \
instances[instance_fit_index], reason
instance_picked = ig.fit_task_to_most_remaining_capacity_instance(task, instances)

if instance_fit_index is None:
assert instance_picked is None, reason
else:
assert instance_picked == instances[instance_fit_index], reason

@pytest.mark.parametrize('instances,instance_fit_index,reason', [
(Is([(0, 100)]), 0, "One idle instance, pick it"),
Expand All @@ -70,16 +65,12 @@ def test_find_largest_idle_instance(self, instances, instance_fit_index, reason)
def filter_offline_instances(*args):
return filter(lambda i: i.capacity > 0, instances)

with mock.patch.object(InstanceGroup,
'instances',
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=filter_offline_instances))):
ig = InstanceGroup(id=10)
ig = InstanceGroup(id=10)
instances_online_only = filter_offline_instances(instances)

if instance_fit_index is None:
assert ig.find_largest_idle_instance() is None, reason
else:
assert ig.find_largest_idle_instance() == \
instances[instance_fit_index], reason
if instance_fit_index is None:
assert ig.find_largest_idle_instance(instances_online_only) is None, reason
else:
assert ig.find_largest_idle_instance(instances_online_only) == \
instances[instance_fit_index], reason

0 comments on commit 7b0ce7a

Please sign in to comment.