Skip to content

Commit

Permalink
[CP][AIRFLOW-6561][EWT-290]: Adding priority class and default resour…
Browse files Browse the repository at this point in the history
…ce for worker pod. (apache#47)
  • Loading branch information
abhishekbafna authored May 6, 2020
1 parent e9642c2 commit d5d0a07
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 4 deletions.
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -840,3 +840,7 @@ fs_group =
# The worker pods will be given these static labels, as well as some additional dynamic labels
# to identify the task.
# Should be supplied in the format: key = value

[kubernetes_worker_resources]
# EWT-290: This is added adhoc basis to configure the Airflow worker resources.
# This should be removed when the similar functionality is available with Airflow upgrade.
1 change: 1 addition & 0 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(self):
)
self.kube_node_selectors = configuration_dict.get('kubernetes_node_selectors', {})
self.kube_annotations = configuration_dict.get('kubernetes_annotations', {})
self.kube_worker_resources = configuration_dict.get('kubernetes_worker_resources', {})
self.kube_labels = configuration_dict.get('kubernetes_labels', {})
self.delete_worker_pods = conf.getboolean(
self.kubernetes_section, 'delete_worker_pods')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,8 @@ def _apply_env_from(pod, req):
}
}
)

@staticmethod
def extract_priority_class(pod, req):
if pod.priority_class:
req['spec']['priorityClassName'] = pod.priority_class
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def create(self, pod):
self.extract_tolerations(pod, req)
self.extract_security_context(pod, req)
self.extract_dnspolicy(pod, req)
self.extract_priority_class(pod, req)
return req


Expand Down Expand Up @@ -135,4 +136,5 @@ def create(self, pod):
self.extract_tolerations(pod, req)
self.extract_security_context(pod, req)
self.extract_dnspolicy(pod, req)
self.extract_priority_class(pod, req)
return req
4 changes: 3 additions & 1 deletion airflow/contrib/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def __init__(
security_context=None,
configmaps=None,
pod_runtime_info_envs=None,
dnspolicy=None
dnspolicy=None,
priority_class=None
):
self.image = image
self.envs = envs or {}
Expand Down Expand Up @@ -141,3 +142,4 @@ def __init__(
self.configmaps = configmaps or []
self.pod_runtime_info_envs = pod_runtime_info_envs or []
self.dnspolicy = dnspolicy
self.priority_class = priority_class
16 changes: 14 additions & 2 deletions airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,16 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
limit_cpu=kube_executor_config.limit_cpu,
limit_gpu=kube_executor_config.limit_gpu
)
# EWT-290 add priorityClassName and cpu and memory resource into pod spec definition.
# The priorityClassName is set as the env (AIRFLOW_POD_PRIORITY_CLASS).
# The cpu and memory resource are set into the airflow.cfg file under 'kube_worker_resources'
resources = Resources(
request_memory=self.kube_config.kube_worker_resources.get('request_memory'),
request_cpu=self.kube_config.kube_worker_resources.get('request_cpu'),
limit_memory=self.kube_config.kube_worker_resources.get('limit_memory'),
limit_cpu=self.kube_config.kube_worker_resources.get('limit_cpu'),
) if resources.is_empty_resource_request() else resources

gcp_sa_key = kube_executor_config.gcp_service_account_key
annotations = dict(kube_executor_config.annotations) or self.kube_config.kube_annotations
if gcp_sa_key:
Expand All @@ -339,6 +349,7 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
affinity = kube_executor_config.affinity or self.kube_config.kube_affinity
tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations

environment = self._get_environment()
return Pod(
namespace=namespace,
name=pod_id,
Expand All @@ -353,7 +364,7 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
'execution_date': execution_date,
'try_number': str(try_number),
}),
envs=self._get_environment(),
envs=environment,
secrets=self._get_secrets(),
service_account_name=self.kube_config.worker_service_account_name,
image_pull_secrets=self.kube_config.image_pull_secrets,
Expand All @@ -367,5 +378,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
affinity=affinity,
tolerations=tolerations,
security_context=self._get_security_context(),
configmaps=self._get_configmaps()
configmaps=self._get_configmaps(),
priority_class=environment.get('AIRFLOW_POD_PRIORITY_CLASS', None)
)
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
# under the License.
#

version = '1.10.4+twtr9'
version = '1.10.4+twtr10'

0 comments on commit d5d0a07

Please sign in to comment.