From c01fb9b9530d4f79ef110f6d665ad97b9663298e Mon Sep 17 00:00:00 2001 From: Sreenath Kamath Date: Sat, 31 Jul 2021 00:58:35 +0530 Subject: [PATCH] Fetching and logging livy session logs for LivyOperrator --- airflow/providers/apache/livy/hooks/livy.py | 62 +++++++++++++++++++ .../providers/apache/livy/operators/livy.py | 1 + .../apache/livy/operators/test_livy.py | 45 +++++++++++++- 3 files changed, 105 insertions(+), 3 deletions(-) diff --git a/airflow/providers/apache/livy/hooks/livy.py b/airflow/providers/apache/livy/hooks/livy.py index c44a7faf672f5..d4fe5989749c0 100644 --- a/airflow/providers/apache/livy/hooks/livy.py +++ b/airflow/providers/apache/livy/hooks/livy.py @@ -243,6 +243,56 @@ def delete_batch(self, session_id: Union[int, str]) -> Any: return response.json() + def get_batch_logs(self, session_id: Union[int, str], log_start_position, log_batch_size) -> Any: + """ + Gets the session logs for a specified batch. + :param session_id: identifier of the batch sessions + :type session_id: int + :param log_start_position: Position from where to pull the logs + :type log_start_position: int + :param log_batch_size: Number of lines to pull in one batch + :type log_batch_size: int + + :return: response body + :rtype: dict + """ + self._validate_session_id(session_id) + log_params = {'from': log_start_position, 'size': log_batch_size} + response = self.run_method(endpoint=f'/batches/{session_id}/log', data=log_params) + try: + response.raise_for_status() + except requests.exceptions.HTTPError as err: + self.log.warning("Got status code %d for session %d", err.response.status_code, session_id) + raise AirflowException( + "Could not fetch the logs for batch with session id: {}. Message: {}".format( + session_id, err.response.text + ) + ) + return response.json() + + def dump_batch_logs(self, session_id: Union[int, str]) -> Any: + """ + Dumps the session logs for a specified batch + + :param session_id: identifier of the batch sessions + :type session_id: int + :return: response body + :rtype: dict + """ + self.log.info("Fetching the logs for batch session with id: %d", session_id) + log_start_line = 0 + log_total_lines = 0 + log_batch_size = 100 + + while log_start_line <= log_total_lines: + # Livy log endpoint is paginated. + response = self.get_batch_logs(session_id, log_start_line, log_batch_size) + log_total_lines = self._parse_request_response(response, 'total') + log_start_line += log_batch_size + log_lines = self._parse_request_response(response, 'log') + for log_line in log_lines: + self.log.info(log_line) + @staticmethod def _validate_session_id(session_id: Union[int, str]) -> None: """ @@ -268,6 +318,18 @@ def _parse_post_response(response: Dict[Any, Any]) -> Any: """ return response.get('id') + @staticmethod + def _parse_request_response(response: Dict[Any, Any], parameter) -> Any: + """ + Parse batch response for batch id + + :param response: response body + :type response: dict + :return: value of parameter + :rtype: Union[int, list] + """ + return response.get(parameter) + @staticmethod def build_post_batch_body( file: str, diff --git a/airflow/providers/apache/livy/operators/livy.py b/airflow/providers/apache/livy/operators/livy.py index 0fbe32b13a8d6..f494568af576c 100644 --- a/airflow/providers/apache/livy/operators/livy.py +++ b/airflow/providers/apache/livy/operators/livy.py @@ -166,6 +166,7 @@ def poll_for_termination(self, batch_id: Union[int, str]) -> None: sleep(self._polling_interval) state = hook.get_batch_state(batch_id) self.log.info("Batch with id %s terminated with state: %s", batch_id, state.value) + hook.dump_batch_logs(batch_id) if state != BatchState.SUCCESS: raise AirflowException(f"Batch {batch_id} did not succeed") diff --git a/tests/providers/apache/livy/operators/test_livy.py b/tests/providers/apache/livy/operators/test_livy.py index 515e37603b8b6..8038f32125ec7 100644 --- a/tests/providers/apache/livy/operators/test_livy.py +++ b/tests/providers/apache/livy/operators/test_livy.py @@ -16,6 +16,7 @@ # under the License. # +import logging import unittest from unittest.mock import MagicMock, patch @@ -32,6 +33,7 @@ mock_livy_client = MagicMock() BATCH_ID = 100 +LOG_RESPONSE = {"total": 3, "log": ['first_line', 'second_line', 'third_line']} class TestLivyOperator(unittest.TestCase): @@ -44,8 +46,12 @@ def setUp(self): ) ) + @patch( + 'airflow.providers.apache.livy.operators.livy.LivyHook.dump_batch_logs', + return_value=None, + ) @patch('airflow.providers.apache.livy.operators.livy.LivyHook.get_batch_state') - def test_poll_for_termination(self, mock_livy): + def test_poll_for_termination(self, mock_livy, mock_dump_logs): state_list = 2 * [BatchState.RUNNING] + [BatchState.SUCCESS] @@ -62,10 +68,15 @@ def side_effect(_): task.poll_for_termination(BATCH_ID) mock_livy.assert_called_with(BATCH_ID) + mock_dump_logs.assert_called_with(BATCH_ID) assert mock_livy.call_count == 3 + @patch( + 'airflow.providers.apache.livy.operators.livy.LivyHook.dump_batch_logs', + return_value=None, + ) @patch('airflow.providers.apache.livy.operators.livy.LivyHook.get_batch_state') - def test_poll_for_termination_fail(self, mock_livy): + def test_poll_for_termination_fail(self, mock_livy, mock_dump_logs): state_list = 2 * [BatchState.RUNNING] + [BatchState.ERROR] @@ -84,14 +95,19 @@ def side_effect(_): task.poll_for_termination(BATCH_ID) mock_livy.assert_called_with(BATCH_ID) + mock_dump_logs.assert_called_with(BATCH_ID) assert mock_livy.call_count == 3 + @patch( + 'airflow.providers.apache.livy.operators.livy.LivyHook.dump_batch_logs', + return_value=None, + ) @patch( 'airflow.providers.apache.livy.operators.livy.LivyHook.get_batch_state', return_value=BatchState.SUCCESS, ) @patch('airflow.providers.apache.livy.operators.livy.LivyHook.post_batch', return_value=BATCH_ID) - def test_execution(self, mock_post, mock_get): + def test_execution(self, mock_post, mock_get, mock_dump_logs): task = LivyOperator( livy_conn_id='livyunittest', file='sparkapp', @@ -104,6 +120,7 @@ def test_execution(self, mock_post, mock_get): call_args = {k: v for k, v in mock_post.call_args[1].items() if v} assert call_args == {'file': 'sparkapp'} mock_get.assert_called_once_with(BATCH_ID) + mock_dump_logs.assert_called_once_with(BATCH_ID) @patch('airflow.providers.apache.livy.operators.livy.LivyHook.post_batch') def test_execution_with_extra_options(self, mock_post): @@ -134,3 +151,25 @@ def test_injected_hook(self): task._livy_hook = def_hook assert task.get_hook() == def_hook + + @patch( + 'airflow.providers.apache.livy.operators.livy.LivyHook.get_batch_state', + return_value=BatchState.SUCCESS, + ) + @patch('airflow.providers.apache.livy.operators.livy.LivyHook.get_batch_logs', return_value=LOG_RESPONSE) + @patch('airflow.providers.apache.livy.operators.livy.LivyHook.post_batch', return_value=BATCH_ID) + def test_log_dump(self, mock_post, mock_get_logs, mock_get): + task = LivyOperator( + livy_conn_id='livyunittest', + file='sparkapp', + dag=self.dag, + task_id='livy_example', + polling_interval=1, + ) + with self.assertLogs(task.get_hook().log, level=logging.INFO) as cm: + task.execute(context={}) + assert 'INFO:airflow.providers.apache.livy.hooks.livy.LivyHook:first_line' in cm.output + assert 'INFO:airflow.providers.apache.livy.hooks.livy.LivyHook:second_line' in cm.output + assert 'INFO:airflow.providers.apache.livy.hooks.livy.LivyHook:third_line' in cm.output + mock_get.assert_called_once_with(BATCH_ID) + mock_get_logs.assert_called_once_with(BATCH_ID, 0, 100)