Skip to content
This repository has been archived by the owner on Mar 15, 2023. It is now read-only.

Commit

Permalink
[AIRFLOW-3022] Add volume mount to KubernetesExecutorConfig (apache#3855
Browse files Browse the repository at this point in the history
)

Added volumes and volume_mounts to the KubernetesExecutorConfig so
`volumes` or `secrets` can be mount to worker pod.
  • Loading branch information
ckljohn authored and wyndhblb committed Nov 9, 2018
1 parent a37da4f commit f4cb395
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
import os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
dag_id='example_kubernetes_annotation', default_args=args,
dag_id='example_kubernetes_executor_config', default_args=args,
schedule_interval=None
)

Expand All @@ -36,6 +37,14 @@ def print_stuff():
print("annotated!")


def test_volume_mount():
with open('/foo/volume_mount_test.txt', 'w') as foo:
foo.write('Hello')

rc = os.system("cat /foo/volume_mount_test.txt")
assert rc == 0


# You can use annotations on your kubernetes pods!
start_task = PythonOperator(
task_id="start_task", python_callable=print_stuff, dag=dag,
Expand All @@ -45,3 +54,26 @@ def print_stuff():
}
}
)

# You can mount volume or secret to the worker pod
second_task = PythonOperator(
task_id="four_task", python_callable=test_volume_mount, dag=dag,
executor_config={
"KubernetesExecutor": {
"volumes": [
{
"name": "test-volume",
"hostPath": {"path": "/tmp/"},
},
],
"volume_mounts": [
{
"mountPath": "/foo/",
"name": "test-volume",
},
]
}
}
)

start_task.set_downstream(second_task)
13 changes: 10 additions & 3 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class KubernetesExecutorConfig:
def __init__(self, image=None, image_pull_policy=None, request_memory=None,
request_cpu=None, limit_memory=None, limit_cpu=None,
gcp_service_account_key=None, node_selectors=None, affinity=None,
annotations=None):
annotations=None, volumes=None, volume_mounts=None):
self.image = image
self.image_pull_policy = image_pull_policy
self.request_memory = request_memory
Expand All @@ -51,15 +51,18 @@ def __init__(self, image=None, image_pull_policy=None, request_memory=None,
self.node_selectors = node_selectors
self.affinity = affinity
self.annotations = annotations
self.volumes = volumes
self.volume_mounts = volume_mounts

def __repr__(self):
return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \
"limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \
"node_selectors={}, affinity={}, annotations={})" \
"node_selectors={}, affinity={}, annotations={}, volumes={}, " \
"volume_mounts={})" \
.format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy,
self.request_memory, self.request_cpu, self.limit_memory,
self.limit_cpu, self.gcp_service_account_key, self.node_selectors,
self.affinity, self.annotations)
self.affinity, self.annotations, self.volumes, self.volume_mounts)

@staticmethod
def from_dict(obj):
Expand All @@ -83,6 +86,8 @@ def from_dict(obj):
node_selectors=namespaced.get('node_selectors', None),
affinity=namespaced.get('affinity', None),
annotations=namespaced.get('annotations', {}),
volumes=namespaced.get('volumes', []),
volume_mounts=namespaced.get('volume_mounts', []),
)

def as_dict(self):
Expand All @@ -97,6 +102,8 @@ def as_dict(self):
'node_selectors': self.node_selectors,
'affinity': self.affinity,
'annotations': self.annotations,
'volumes': self.volumes,
'volume_mounts': self.volume_mounts,
}


Expand Down
2 changes: 2 additions & 0 deletions airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ def _construct_volume(name, claim):
def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_date,
airflow_command, kube_executor_config):
volumes, volume_mounts = self.init_volumes_and_mounts()
volumes += kube_executor_config.volumes
volume_mounts += kube_executor_config.volume_mounts
worker_init_container_spec = self._get_init_containers(
copy.deepcopy(volume_mounts))
resources = Resources(
Expand Down
4 changes: 2 additions & 2 deletions tests/contrib/minikube/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def start_dag(self, dag_id, host):

def test_integration_run_dag(self):
host = get_minikube_host()
dag_id = 'example_kubernetes_annotation'
dag_id = 'example_kubernetes_executor_config'

result_json = self.start_dag(dag_id=dag_id, host=host)

Expand All @@ -181,7 +181,7 @@ def test_integration_run_dag(self):

def test_integration_run_dag_with_scheduler_failure(self):
host = get_minikube_host()
dag_id = 'example_kubernetes_annotation'
dag_id = 'example_kubernetes_executor_config'

result_json = self.start_dag(dag_id=dag_id, host=host)

Expand Down

0 comments on commit f4cb395

Please sign in to comment.