diff --git a/airflow/providers/elasticsearch/log/es_response.py b/airflow/providers/elasticsearch/log/es_response.py new file mode 100644 index 000000000000..9a13847a82f7 --- /dev/null +++ b/airflow/providers/elasticsearch/log/es_response.py @@ -0,0 +1,157 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + + +def _wrap(val): + if isinstance(val, dict): + return AttributeDict(val) + return val + + +class AttributeList: + """Helper class to provide attribute like access to List objects.""" + + def __init__(self, _list): + if not isinstance(_list, list): + _list = list(_list) + self._l_ = _list + + def __getitem__(self, k): + val = self._l_[k] + if isinstance(val, slice): + return AttributeList(val) + return _wrap(val) + + def __iter__(self): + return map(lambda i: _wrap(i), self._l_) + + def __bool__(self): + return bool(self._l_) + + +class AttributeDict: + """Helper class to provide attribute like access to Dictionary objects.""" + + def __init__(self, d): + super().__setattr__("_d_", d) + + def __getattr__(self, attr_name): + try: + return self.__getitem__(attr_name) + except KeyError: + raise AttributeError(f"{self.__class__.__name__!r} object has no attribute {attr_name!r}") + + def __getitem__(self, key): + return _wrap(self._d_[key]) + + def to_dict(self): + return self._d_ + + +class Hit(AttributeDict): + """ + The Hit class is used to manage and access elements in a document. + It inherits from the AttributeDict class and provides + attribute-like access to its elements, similar to a dictionary. + """ + + def __init__(self, document): + data = {} + if "_source" in document: + data = document["_source"] + if "fields" in document: + data.update(document["fields"]) + + super().__init__(data) + super().__setattr__("meta", HitMeta(document)) + + +class HitMeta(AttributeDict): + """ + The HitMeta class is used to manage and access metadata of a document. + + This class inherits from the AttributeDict class and provides + attribute-like access to its elements. + """ + + def __init__(self, document, exclude=("_source", "_fields")): + d = {k[1:] if k.startswith("_") else k: v for (k, v) in document.items() if k not in exclude} + if "type" in d: + # make sure we are consistent everywhere in python + d["doc_type"] = d.pop("type") + super().__init__(d) + + +class ElasticSearchResponse(AttributeDict): + """ + The ElasticSearchResponse class is used to manage and access the response from an Elasticsearch search. + + This class can be iterated over directly to access hits in the response. Indexing the class instance + with an integer or slice will also access the hits. The class also evaluates to True + if there are any hits in the response. + + The hits property returns an AttributeList of hits in the response, with each hit transformed into + an instance of the doc_class if provided. + + The response parameter stores the dictionary returned by the Elasticsearch client search method. + """ + + def __init__(self, search, response, doc_class=None): + super().__setattr__("_search", search) + super().__setattr__("_doc_class", doc_class) + super().__init__(response) + + def __iter__(self): + return iter(self.hits) + + def __getitem__(self, key): + if isinstance(key, (slice, int)): + return self.hits[key] + return super().__getitem__(key) + + def __bool__(self): + return bool(self.hits) + + @property + def hits(self): + """ + This property provides access to the hits (i.e., the results) of the Elasticsearch response. + + The hits are represented as an `AttributeList` of `Hit` instances, which allow for easy, + attribute-like access to the hit data. + + The hits are lazily loaded, meaning they're not processed until this property is accessed. + Upon first access, the hits data from the response is processed using the `_get_result` method + of the associated `Search` instance (i.e. an instance from ElasticsearchTaskHandler class), + and the results are stored for future accesses. + + Each hit also includes all the additional data present in the "hits" field of the response, + accessible as attributes of the hit. + """ + if not hasattr(self, "_hits"): + h = self._d_["hits"] + + try: + hits = AttributeList(map(self._search._get_result, h["hits"])) + except AttributeError as e: + raise TypeError("Could not parse hits.", e) + + super().__setattr__("_hits", hits) + for k in h: + setattr(self._hits, k, _wrap(h[k])) + return self._hits diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 1bc071d2c298..2a780b622e1d 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -24,7 +24,7 @@ from datetime import datetime from operator import attrgetter from time import time -from typing import TYPE_CHECKING, List, Tuple +from typing import TYPE_CHECKING, Any, Callable, List, Tuple from urllib.parse import quote # Using `from elasticsearch import *` would break elasticsearch mocking used in unit test. @@ -37,6 +37,7 @@ from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter +from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse, Hit from airflow.utils import timezone from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin @@ -52,34 +53,6 @@ USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template") -class Log: - """wrapper class to mimic the attributes in Search class used in elasticsearch_dsl.Search.""" - - def __init__(self, offset): - self.offset = offset - - -class ElasticSearchResponse: - """wrapper class to mimic the Search class used in elasticsearch_dsl.Search.""" - - def __init__(self, **kwargs): - # Store all provided keyword arguments as attributes of this object - for key, value in kwargs.items(): - if key == "log": - setattr(self, key, Log(**value)) - else: - setattr(self, key, value) - - def to_dict(self): - result = {} - for key in self.__dict__.keys(): - if key == "log": - result[key] = self.__dict__[key].__dict__ - else: - result[key] = self.__dict__[key] - return result - - class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): """ ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch. @@ -150,6 +123,8 @@ def __init__( self.formatter: logging.Formatter self.handler: logging.FileHandler | logging.StreamHandler # type: ignore[assignment] + self._doc_type_map: dict[Any, Any] = {} + self._doc_type: list[Any] = [] def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: with create_session() as session: @@ -299,7 +274,7 @@ def _format_msg(self, log_line): # Just a safe-guard to preserve backwards-compatibility return log_line.message - def es_read(self, log_id: str, offset: str, metadata: dict) -> list: + def es_read(self, log_id: str, offset: str, metadata: dict) -> list | ElasticSearchResponse: """ Return the logs matching log_id in Elasticsearch and next offset or ''. @@ -307,17 +282,13 @@ def es_read(self, log_id: str, offset: str, metadata: dict) -> list: :param offset: the offset start to read log from. :param metadata: log metadata, used for steaming log download. """ - # Offset is the unique key for sorting logs given log_id. - query = { + query: dict[Any, Any] = { "query": { "bool": { - "must": [ - {"match_phrase": {"log_id": log_id}}, - {"range": {self.offset_field: {"gt": int(offset)}}}, - ] + "filter": [{"range": {self.offset_field: {"gt": int(offset)}}}], + "must": [{"match_phrase": {"log_id": log_id}}], } - }, - "sort": [{self.offset_field: {"order": "asc"}}], + } } try: @@ -329,21 +300,17 @@ def es_read(self, log_id: str, offset: str, metadata: dict) -> list: self.log.exception("Could not get current log size with log_id: %s", log_id) raise e - logs = [] + logs: list[Any] | ElasticSearchResponse = [] if max_log_line != 0: try: + query.update({"sort": [self.offset_field]}) res = self.client.search( index=self.index_patterns, body=query, size=self.MAX_LINE_PER_PAGE, from_=self.MAX_LINE_PER_PAGE * self.PAGE, ) - logs = [ - ElasticSearchResponse( - **unwrap_response(response), - ) - for response in res["hits"]["hits"] - ] + logs = ElasticSearchResponse(self, res) except elasticsearch.exceptions.ElasticsearchException: self.log.exception("Could not read log with log_id: %s", log_id) @@ -448,6 +415,99 @@ def supports_external_link(self) -> bool: """Whether we can support external links.""" return bool(self.frontend) + def _resolve_nested(self, hit: dict[Any, Any], parent_class=None) -> type[Hit]: + """ + Resolves nested hits from Elasticsearch by iteratively navigating the `_nested` field. + The result is used to fetch the appropriate document class to handle the hit. + + This method can be used with nested Elasticsearch fields which are structured + as dictionaries with "field" and "_nested" keys. + """ + doc_class = Hit + + nested_path: list[str] = [] + nesting = hit["_nested"] + while nesting and "field" in nesting: + nested_path.append(nesting["field"]) + nesting = nesting.get("_nested") + nested_path_str = ".".join(nested_path) + + if hasattr(parent_class, "_index"): + nested_field = parent_class._index.resolve_field(nested_path_str) + + if nested_field is not None: + return nested_field._doc_class + + return doc_class + + def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit: + """ + This method processes a hit (i.e., a result) from an Elasticsearch response and transforms it into an + appropriate class instance. + + The transformation depends on the contents of the hit. If the document in hit contains a nested field, + the '_resolve_nested' method is used to determine the appropriate class (based on the nested path). + If the hit has a document type that is present in the '_doc_type_map', the corresponding class is + used. If not, the method iterates over the '_doc_type' classes and uses the first one whose '_matches' + method returns True for the hit. + + If the hit contains any 'inner_hits', these are also processed into 'ElasticSearchResponse' instances + using the determined class. + + Finally, the transformed hit is returned. If the determined class has a 'from_es' method, this is + used to transform the hit + + An example of the hit argument: + + {'_id': 'jdeZT4kBjAZqZnexVUxk', + '_index': '.ds-filebeat-8.8.2-2023.07.09-000001', + '_score': 2.482621, + '_source': {'@timestamp': '2023-07-13T14:13:15.140Z', + 'asctime': '2023-07-09T07:47:43.907+0000', + 'container': {'id': 'airflow'}, + 'dag_id': 'example_bash_operator', + 'ecs': {'version': '8.0.0'}, + 'execution_date': '2023_07_09T07_47_32_000000', + 'filename': 'taskinstance.py', + 'input': {'type': 'log'}, + 'levelname': 'INFO', + 'lineno': 1144, + 'log': {'file': {'path': "/opt/airflow/Documents/GitHub/airflow/logs/ + dag_id=example_bash_operator'/run_id=owen_run_run/ + task_id=run_after_loop/attempt=1.log"}, + 'offset': 0}, + 'log.offset': 1688888863907337472, + 'log_id': 'example_bash_operator-run_after_loop-owen_run_run--1-1', + 'message': 'Dependencies all met for dep_context=non-requeueable ' + 'deps ti=', + 'task_id': 'run_after_loop', + 'try_number': '1'}, + '_type': '_doc'} + """ + doc_class = Hit + dt = hit.get("_type") + + if "_nested" in hit: + doc_class = self._resolve_nested(hit, parent_class) + + elif dt in self._doc_type_map: + doc_class = self._doc_type_map[dt] + + else: + for doc_type in self._doc_type: + if hasattr(doc_type, "_matches") and doc_type._matches(hit): + doc_class = doc_type + break + + for t in hit.get("inner_hits", ()): + hit["inner_hits"][t] = ElasticSearchResponse(self, hit["inner_hits"][t], doc_class=doc_class) + + # callback should get the Hit class if "from_es" is not defined + callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class) + return callback(hit) + def getattr_nested(obj, item, default): """ @@ -462,33 +522,3 @@ def getattr_nested(obj, item, default): return attrgetter(item)(obj) except AttributeError: return default - - -def unwrap_response(res): - source = res["_source"] - transformed = { - "log_id": source.get("log_id"), - "message": source.get("message"), - "meta": { - "id": res.get("_id"), - "index": res.get("_index"), - "version": res.get("_version"), - "headers": res.get("_headers"), - }, - } - if "offset" in source: - transformed["offset"] = source["offset"] - if "asctime" in source: - transformed["asctime"] = source["asctime"] - if "filename" in source: - transformed["filename"] = source["filename"] - if "host" in source: - transformed["host"] = source["host"] - if "levelname" in source: - transformed["levelname"] = source["levelname"] - if "lineno" in source: - transformed["lineno"] = source["lineno"] - if "log" in source: - transformed["log"] = source["log"] - - return transformed diff --git a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py index c7887b864fa3..c4e25d290dc4 100644 --- a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py +++ b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py @@ -76,6 +76,119 @@ def info(self, params=None): "tagline": "You Know, for Search", } + @query_params() + def sample_log_response(self, headers=None, params=None): + return { + "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 7}, + "hits": { + "hits": [ + { + "_id": "jdeZT4kBjAZqZnexVUxk", + "_index": ".ds-filebeat-8.8.2-2023.07.09-000001", + "_score": 2.482621, + "_source": { + "@timestamp": "2023-07-13T14:13:15.140Z", + "asctime": "2023-07-09T07:47:43.907+0000", + "container": {"id": "airflow"}, + "dag_id": "example_bash_operator", + "ecs": {"version": "8.0.0"}, + "execution_date": "2023_07_09T07_47_32_000000", + "filename": "taskinstance.py", + "input": {"type": "log"}, + "levelname": "INFO", + "lineno": 1144, + "log": { + "file": { + "path": "/opt/airflow/Documents/GitHub/airflow/logs/" + "dag_id=example_bash_operator'" + "/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log" + }, + "offset": 0, + }, + "log.offset": 1688888863907337472, + "log_id": "example_bash_operator-run_after_loop-owen_run_run--1-1", + "message": "Dependencies all met for " + "dep_context=non-requeueable deps " + "ti=", + "task_id": "run_after_loop", + "try_number": "1", + }, + "_type": "_doc", + }, + { + "_id": "qteZT4kBjAZqZnexVUxl", + "_index": ".ds-filebeat-8.8.2-2023.07.09-000001", + "_score": 2.482621, + "_source": { + "@timestamp": "2023-07-13T14:13:15.141Z", + "asctime": "2023-07-09T07:47:43.917+0000", + "container": {"id": "airflow"}, + "dag_id": "example_bash_operator", + "ecs": {"version": "8.0.0"}, + "execution_date": "2023_07_09T07_47_32_000000", + "filename": "taskinstance.py", + "input": {"type": "log"}, + "levelname": "INFO", + "lineno": 1347, + "log": { + "file": { + "path": "/opt/airflow/Documents/GitHub/airflow/logs/" + "dag_id=example_bash_operator" + "/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log" + }, + "offset": 988, + }, + "log.offset": 1688888863917961216, + "log_id": "example_bash_operator-run_after_loop-owen_run_run--1-1", + "message": "Starting attempt 1 of 1", + "task_id": "run_after_loop", + "try_number": "1", + }, + "_type": "_doc", + }, + { + "_id": "v9eZT4kBjAZqZnexVUx2", + "_index": ".ds-filebeat-8.8.2-2023.07.09-000001", + "_score": 2.482621, + "_source": { + "@timestamp": "2023-07-13T14:13:15.143Z", + "asctime": "2023-07-09T07:47:43.928+0000", + "container": {"id": "airflow"}, + "dag_id": "example_bash_operator", + "ecs": {"version": "8.0.0"}, + "execution_date": "2023_07_09T07_47_32_000000", + "filename": "taskinstance.py", + "input": {"type": "log"}, + "levelname": "INFO", + "lineno": 1368, + "log": { + "file": { + "path": "/opt/airflow/Documents/GitHub/airflow/logs/" + "dag_id=example_bash_operator" + "/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log" + }, + "offset": 1372, + }, + "log.offset": 1688888863928218880, + "log_id": "example_bash_operator-run_after_loop-owen_run_run--1-1", + "message": "Executing on 2023-07-09 " + "07:47:32+00:00", + "task_id": "run_after_loop", + "try_number": "1", + }, + "_type": "_doc", + }, + ], + "max_score": 2.482621, + "total": {"relation": "eq", "value": 36}, + }, + "timed_out": False, + "took": 7, + } + @query_params( "consistency", "op_type", @@ -291,7 +404,6 @@ def search(self, index=None, doc_type=None, body=None, params=None, headers=None "consistency", "parent", "refresh", "replication", "routing", "timeout", "version", "version_type" ) def delete(self, index, doc_type, id, params=None, headers=None): - found = False if index in self.__documents_dict: diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index d402cec8aa75..93137ff2b2d6 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -32,6 +32,7 @@ from elasticsearch.exceptions import ElasticsearchException from airflow.configuration import conf +from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler, getattr_nested from airflow.utils import timezone from airflow.utils.state import DagRunState, TaskInstanceState @@ -103,6 +104,27 @@ def setup_method(self, method): def teardown_method(self): shutil.rmtree(self.local_log_location.split(os.path.sep)[0], ignore_errors=True) + def test_es_response(self): + sample_response = self.es.sample_log_response() + es_response = ElasticSearchResponse(self.es_task_handler, sample_response) + logs_by_host = self.es_task_handler._group_logs_by_host(es_response) + + def concat_logs(lines): + log_range = ( + (len(lines) - 1) if lines[-1].message == self.es_task_handler.end_of_log_mark else len(lines) + ) + return "\n".join(self.es_task_handler._format_msg(lines[i]) for i in range(log_range)) + + for _, hosted_log in logs_by_host.items(): + message = concat_logs(hosted_log) + + assert ( + message == "Dependencies all met for dep_context=non-requeueable" + " deps ti=\n" + "Starting attempt 1 of 1\nExecuting " + "on 2023-07-09 07:47:32+00:00" + ) + def test_client(self): assert isinstance(self.es_task_handler.client, elasticsearch.Elasticsearch) assert self.es_task_handler.index_patterns == "_all"