diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index ad59bcd560b19..d0e8039bb7f19 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -142,7 +142,7 @@ def base_container_is_running(self, pod): wait=tenacity.wait_exponential(), reraise=True ) - def read_pod_logs(self, pod): + def read_pod_logs(self, pod, tail_lines=10): try: return self._client.read_namespaced_pod_log( @@ -150,7 +150,7 @@ def read_pod_logs(self, pod): namespace=pod.namespace, container='base', follow=True, - tail_lines=10, + tail_lines=tail_lines, _preload_content=False ) except BaseHTTPError as e: diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index c1c32658a164a..9ae33ceb2e464 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -105,6 +105,30 @@ def _read(self, ti, try_number, metadata=None): except Exception as e: log = "*** Failed to load local log file: {}\n".format(location) log += "*** {}\n".format(str(e)) + elif conf.get('core', 'executor') == 'KubernetesExecutor': + log += '*** Trying to get logs (last 100 lines) from worker pod {} ***\n\n' \ + .format(ti.hostname) + + try: + from airflow.contrib.kubernetes.kube_client import get_kube_client + + kube_client = get_kube_client() + res = kube_client.read_namespaced_pod_log( + name=ti.hostname, + namespace=conf.get('kubernetes', 'namespace'), + container='base', + follow=False, + tail_lines=100, + _preload_content=False + ) + + for line in res: + log += line.decode() + + except Exception as f: # pylint: disable=broad-except + log += '*** Unable to fetch logs from worker pod {} ***\n{}\n\n'.format( + ti.hostname, str(f) + ) else: url = os.path.join( "http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path