diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index e5bce009874d..eb3f771d506c 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2299,98 +2299,6 @@ kerberos: type: boolean example: ~ default: "True" -elasticsearch: - description: ~ - options: - host: - description: | - Elasticsearch host - version_added: 1.10.4 - type: string - example: ~ - default: "" - log_id_template: - description: | - Format of the log_id, which is used to query for a given tasks logs - version_added: 1.10.4 - type: string - example: ~ - is_template: true - default: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}" - end_of_log_mark: - description: | - Used to mark the end of a log stream for a task - version_added: 1.10.4 - type: string - example: ~ - default: "end_of_log" - frontend: - description: | - Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id - Code will construct log_id using the log_id template from the argument above. - NOTE: scheme will default to https if one is not provided - version_added: 1.10.4 - type: string - example: "http://localhost:5601/app/kibana#/discover\ - ?_a=(columns:!(message),query:(language:kuery,query:'log_id: \"{log_id}\"'),sort:!(log.offset,asc))" - default: "" - write_stdout: - description: | - Write the task logs to the stdout of the worker, rather than the default files - version_added: 1.10.4 - type: string - example: ~ - default: "False" - json_format: - description: | - Instead of the default log formatter, write the log lines as JSON - version_added: 1.10.4 - type: string - example: ~ - default: "False" - json_fields: - description: | - Log fields to also attach to the json output, if enabled - version_added: 1.10.4 - type: string - example: ~ - default: "asctime, filename, lineno, levelname, message" - host_field: - description: | - The field where host name is stored (normally either `host` or `host.name`) - version_added: 2.1.1 - type: string - example: ~ - default: "host" - offset_field: - description: | - The field where offset is stored (normally either `offset` or `log.offset`) - version_added: 2.1.1 - type: string - example: ~ - default: "offset" - index_patterns: - description: | - Comma separated list of index patterns to use when searching for logs (default: `_all`). - version_added: 2.6.0 - type: string - example: something-* - default: "_all" -elasticsearch_configs: - description: ~ - options: - use_ssl: - description: ~ - version_added: 1.10.5 - type: string - example: ~ - default: "False" - verify_certs: - description: ~ - version_added: 1.10.5 - type: string - example: ~ - default: "True" sensors: description: ~ options: diff --git a/airflow/providers/elasticsearch/CHANGELOG.rst b/airflow/providers/elasticsearch/CHANGELOG.rst index a7daf1475314..b7882d0656e6 100644 --- a/airflow/providers/elasticsearch/CHANGELOG.rst +++ b/airflow/providers/elasticsearch/CHANGELOG.rst @@ -27,6 +27,12 @@ Changelog --------- +.. note:: + Upgrade to Elasticsearch 8. The ElasticsearchTaskHandler & ElasticsearchSQLHook will now use Elasticsearch 8 package. + As explained https://elasticsearch-py.readthedocs.io/en/stable , Elasticsearch language clients are only backwards + compatible with default distributions and without guarantees made, we recommend upgrading the version of + Elasticsearch database to 8 to ensure compatibility with the language client. + 5.0.0 ..... diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index ccac90480a00..03bfe247c582 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -30,7 +30,7 @@ # Using `from elasticsearch import *` would break elasticsearch mocking used in unit test. import elasticsearch import pendulum -from elasticsearch.exceptions import ElasticsearchException, NotFoundError +from elasticsearch.exceptions import NotFoundError from airflow.configuration import conf from airflow.exceptions import AirflowProviderDeprecationWarning @@ -89,7 +89,7 @@ def __init__( json_fields: str, host_field: str = "host", offset_field: str = "offset", - host: str = "localhost:9200", + host: str = "http://localhost:9200", frontend: str = "localhost:5601", index_patterns: str | None = conf.get("elasticsearch", "index_patterns", fallback="_all"), es_kwargs: dict | None = conf.getsection("elasticsearch_configs"), @@ -101,8 +101,8 @@ def __init__( super().__init__(base_log_folder, filename_template) self.closed = False - self.client = elasticsearch.Elasticsearch(host.split(";"), **es_kwargs) # type: ignore[attr-defined] - + self.client = elasticsearch.Elasticsearch(host, **es_kwargs) # type: ignore[attr-defined] + # in airflow.cfg, host of elasticsearch has to be http://dockerhostXxxx:9200 if USE_PER_RUN_LOG_ID and log_id_template is not None: warnings.warn( "Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect", @@ -292,27 +292,24 @@ def es_read(self, log_id: str, offset: int | str, metadata: dict) -> list | Elas } try: - max_log_line = self.client.count(index=self.index_patterns, body=query)["count"] + max_log_line = self.client.count(index=self.index_patterns, body=query)["count"] # type: ignore except NotFoundError as e: self.log.exception("The target index pattern %s does not exist", self.index_patterns) raise e - except ElasticsearchException as e: - self.log.exception("Could not get current log size with log_id: %s", log_id) - raise e logs: list[Any] | ElasticSearchResponse = [] if max_log_line != 0: try: query.update({"sort": [self.offset_field]}) - res = self.client.search( + res = self.client.search( # type: ignore index=self.index_patterns, body=query, size=self.MAX_LINE_PER_PAGE, from_=self.MAX_LINE_PER_PAGE * self.PAGE, ) logs = ElasticSearchResponse(self, res) - except elasticsearch.exceptions.ElasticsearchException: - self.log.exception("Could not read log with log_id: %s", log_id) + except Exception as err: + self.log.exception("Could not read log with log_id: %s. Exception: %s", log_id, err) return logs diff --git a/airflow/providers/elasticsearch/provider.yaml b/airflow/providers/elasticsearch/provider.yaml index 2b6ffaabf28b..b848c1647ce8 100644 --- a/airflow/providers/elasticsearch/provider.yaml +++ b/airflow/providers/elasticsearch/provider.yaml @@ -53,7 +53,7 @@ versions: dependencies: - apache-airflow>=2.4.0 - apache-airflow-providers-common-sql>=1.3.1 - - elasticsearch>7,<7.15.0 + - elasticsearch>8,<9 integrations: - integration-name: Elasticsearch @@ -72,3 +72,97 @@ connection-types: logging: - airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler + +config: + elasticsearch: + description: ~ + options: + host: + description: | + Elasticsearch host + version_added: 1.10.4 + type: string + example: ~ + default: "" + log_id_template: + description: | + Format of the log_id, which is used to query for a given tasks logs + version_added: 1.10.4 + type: string + example: ~ + is_template: true + default: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}" + end_of_log_mark: + description: | + Used to mark the end of a log stream for a task + version_added: 1.10.4 + type: string + example: ~ + default: "end_of_log" + frontend: + description: | + Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id + Code will construct log_id using the log_id template from the argument above. + NOTE: scheme will default to https if one is not provided + version_added: 1.10.4 + type: string + example: "http://localhost:5601/app/kibana#/discover\ + ?_a=(columns:!(message),query:(language:kuery,query:'log_id: \"{log_id}\"'),sort:!(log.offset,asc))" + default: "" + write_stdout: + description: | + Write the task logs to the stdout of the worker, rather than the default files + version_added: 1.10.4 + type: string + example: ~ + default: "False" + json_format: + description: | + Instead of the default log formatter, write the log lines as JSON + version_added: 1.10.4 + type: string + example: ~ + default: "False" + json_fields: + description: | + Log fields to also attach to the json output, if enabled + version_added: 1.10.4 + type: string + example: ~ + default: "asctime, filename, lineno, levelname, message" + host_field: + description: | + The field where host name is stored (normally either `host` or `host.name`) + version_added: 2.1.1 + type: string + example: ~ + default: "host" + offset_field: + description: | + The field where offset is stored (normally either `offset` or `log.offset`) + version_added: 2.1.1 + type: string + example: ~ + default: "offset" + index_patterns: + description: | + Comma separated list of index patterns to use when searching for logs (default: `_all`). + version_added: 2.6.0 + type: string + example: something-* + default: "_all" + elasticsearch_configs: + description: ~ + options: + http_compress: + description: ~ + version_added: 1.10.5 + type: string + example: ~ + default: "False" + verify_certs: + description: ~ + version_added: 1.10.5 + type: string + example: ~ + default: "True" diff --git a/docs/apache-airflow-providers-elasticsearch/configurations-ref.rst b/docs/apache-airflow-providers-elasticsearch/configurations-ref.rst new file mode 100644 index 000000000000..5885c9d91b6e --- /dev/null +++ b/docs/apache-airflow-providers-elasticsearch/configurations-ref.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../exts/includes/providers-configurations-ref.rst diff --git a/docs/apache-airflow-providers-elasticsearch/index.rst b/docs/apache-airflow-providers-elasticsearch/index.rst index 5a4582412aad..f2b38434d24d 100644 --- a/docs/apache-airflow-providers-elasticsearch/index.rst +++ b/docs/apache-airflow-providers-elasticsearch/index.rst @@ -43,6 +43,7 @@ :maxdepth: 1 :caption: References + Configuration Python API <_api/airflow/providers/elasticsearch/index> .. toctree:: @@ -103,7 +104,7 @@ PIP package Version required ======================================= ================== ``apache-airflow`` ``>=2.4.0`` ``apache-airflow-providers-common-sql`` ``>=1.3.1`` -``elasticsearch`` ``>7,<7.15.0`` +``elasticsearch`` ``>8,<9`` ======================================= ================== Cross provider package dependencies diff --git a/docs/apache-airflow/configurations-ref.rst b/docs/apache-airflow/configurations-ref.rst index f323ea31f0f3..c92a9975e358 100644 --- a/docs/apache-airflow/configurations-ref.rst +++ b/docs/apache-airflow/configurations-ref.rst @@ -41,6 +41,7 @@ in the provider's documentation. The pre-installed providers that you may want t * :doc:`Configuration Reference for SMTP Provider ` * :doc:`Configuration Reference for IMAP Provider ` * :doc:`Configuration Reference for OpenLineage Provider ` +* :doc:`Configuration Reference for Elasticsearch Provider ` .. note:: For more information see :doc:`/howto/set-config`. diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 992979362653..990e397e706b 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -358,7 +358,7 @@ "deps": [ "apache-airflow-providers-common-sql>=1.3.1", "apache-airflow>=2.4.0", - "elasticsearch>7,<7.15.0" + "elasticsearch>8,<9" ], "cross-providers-deps": [ "common.sql" diff --git a/tests/providers/elasticsearch/log/elasticmock/__init__.py b/tests/providers/elasticsearch/log/elasticmock/__init__.py index 4f38d3d9327d..0884cff9efce 100644 --- a/tests/providers/elasticsearch/log/elasticmock/__init__.py +++ b/tests/providers/elasticsearch/log/elasticmock/__init__.py @@ -41,17 +41,55 @@ """Elastic mock module used for testing""" from functools import wraps from unittest.mock import patch - -from elasticsearch.client.utils import _normalize_hosts +from urllib.parse import unquote, urlparse from .fake_elasticsearch import FakeElasticsearch ELASTIC_INSTANCES: dict[str, FakeElasticsearch] = {} +def _normalize_hosts(hosts): + """ + Helper function to transform hosts argument to + :class:`~elasticsearch.Elasticsearch` to a list of dicts. + """ + # if hosts are empty, just defer to defaults down the line + if hosts is None: + return [{}] + + hosts = [hosts] + + out = [] + + for host in hosts: + if "://" not in host: + host = f"//{host}" + + parsed_url = urlparse(host) + h = {"host": parsed_url.hostname} + + if parsed_url.port: + h["port"] = parsed_url.port + + if parsed_url.scheme == "https": + h["port"] = parsed_url.port or 443 + h["use_ssl"] = True + + if parsed_url.username or parsed_url.password: + h["http_auth"] = f"{unquote(parsed_url.username)}:{unquote(parsed_url.password)}" + + if parsed_url.path and parsed_url.path != "/": + h["url_prefix"] = parsed_url.path + + out.append(h) + else: + out.append(host) + return out + + def _get_elasticmock(hosts=None, *args, **kwargs): host = _normalize_hosts(hosts)[0] - elastic_key = f"{host.get('host', 'localhost')}:{host.get('port', 9200)}" + elastic_key = f"http://{host.get('host', 'localhost')}:{host.get('port', 9200)}" if elastic_key in ELASTIC_INSTANCES: connection = ELASTIC_INSTANCES.get(elastic_key) diff --git a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py index c4e25d290dc4..b37608232d35 100644 --- a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py +++ b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py @@ -20,10 +20,9 @@ import json from elasticsearch import Elasticsearch -from elasticsearch.client.utils import query_params from elasticsearch.exceptions import NotFoundError -from .utilities import get_random_id +from .utilities import MissingIndexException, get_random_id, query_params # # The MIT License (MIT) @@ -53,7 +52,7 @@ class FakeElasticsearch(Elasticsearch): __documents_dict = None def __init__(self): - super().__init__() + super().__init__("http://localhost:9200") self.__documents_dict = {} @query_params() @@ -327,9 +326,8 @@ def get_source(self, index, doc_type, id, params=None): "version", ) def count(self, index=None, doc_type=None, body=None, params=None, headers=None): - searchable_indexes = self._normalize_index_to_list(index) + searchable_indexes = self._normalize_index_to_list(index, body) searchable_doc_types = self._normalize_doc_type_to_list(doc_type) - i = 0 for searchable_index in searchable_indexes: for document in self.__documents_dict[searchable_index]: @@ -376,7 +374,7 @@ def count(self, index=None, doc_type=None, body=None, params=None, headers=None) "version", ) def search(self, index=None, doc_type=None, body=None, params=None, headers=None): - searchable_indexes = self._normalize_index_to_list(index) + searchable_indexes = self._normalize_index_to_list(index, body) matches = self._find_match(index, doc_type, body) @@ -446,7 +444,7 @@ def suggest(self, body, index=None): return result_dict def _find_match(self, index, doc_type, body): - searchable_indexes = self._normalize_index_to_list(index) + searchable_indexes = self._normalize_index_to_list(index, body) searchable_doc_types = self._normalize_doc_type_to_list(doc_type) must = body["query"]["bool"]["must"][0] # only support one must @@ -477,19 +475,20 @@ def match_must_phrase(document, matches, must): matches.append(document) # Check index(es) exists. - def _validate_search_targets(self, targets): + def _validate_search_targets(self, targets, body): # TODO: support allow_no_indices query parameter matches = set() for target in targets: + print(f"Loop over:::target = {target}") if target == "_all" or target == "": matches.update(self.__documents_dict) elif "*" in target: matches.update(fnmatch.filter(self.__documents_dict, target)) elif target not in self.__documents_dict: - raise NotFoundError(404, f"IndexMissingException[[{target}] missing]") + raise MissingIndexException(msg=f"IndexMissingException[[{target}] missing]", body=body) return matches - def _normalize_index_to_list(self, index): + def _normalize_index_to_list(self, index, body): # Ensure to have a list of index if index is None: searchable_indexes = self.__documents_dict.keys() @@ -501,11 +500,8 @@ def _normalize_index_to_list(self, index): # Is it the correct exception to use ? raise ValueError("Invalid param 'index'") - return list( - self._validate_search_targets( - target for index in searchable_indexes for target in index.split(",") - ) - ) + generator = (target for index in searchable_indexes for target in index.split(",")) + return list(self._validate_search_targets(generator, body)) @staticmethod def _normalize_doc_type_to_list(doc_type): diff --git a/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py index ef142d6d98bc..cb2d91f4ce61 100644 --- a/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py +++ b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py @@ -39,13 +39,193 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. """Utilities for Elastic mock""" +import base64 import random import string +from datetime import date, datetime +from functools import wraps + +from elasticsearch.exceptions import NotFoundError DEFAULT_ELASTICSEARCH_ID_SIZE = 20 CHARSET_FOR_ELASTICSEARCH_ID = string.ascii_letters + string.digits +GLOBAL_PARAMS = ("pretty", "human", "error_trace", "format", "filter_path") def get_random_id(size=DEFAULT_ELASTICSEARCH_ID_SIZE): """Returns random if for elasticsearch""" return "".join(random.choice(CHARSET_FOR_ELASTICSEARCH_ID) for _ in range(size)) + + +def query_params(*es_query_params, **kwargs): + """ + Decorator that pops all accepted parameters from method's kwargs and puts + them in the params argument. + """ + body_params = kwargs.pop("body_params", None) + body_only_params = set(body_params or ()) - set(es_query_params) + body_name = kwargs.pop("body_name", None) + body_required = kwargs.pop("body_required", False) + type_possible_in_params = "type" in es_query_params + + assert not (body_name and body_params) + + assert not (body_name and body_required) + assert not body_required or body_params + + def _wrapper(func): + @wraps(func) + def _wrapped(*args, **kwargs): + params = (kwargs.pop("params", None) or {}).copy() + headers = {k.lower(): v for k, v in (kwargs.pop("headers", None) or {}).copy().items()} + + if "opaque_id" in kwargs: + headers["x-opaque-id"] = kwargs.pop("opaque_id") + + http_auth = kwargs.pop("http_auth", None) + api_key = kwargs.pop("api_key", None) + + using_body_kwarg = kwargs.get("body", None) is not None + using_positional_args = args and len(args) > 1 + + if type_possible_in_params: + doc_type_in_params = params and "doc_type" in params + doc_type_in_kwargs = "doc_type" in kwargs + + if doc_type_in_params: + params["type"] = params.pop("doc_type") + if doc_type_in_kwargs: + kwargs["type"] = kwargs.pop("doc_type") + + if using_body_kwarg or using_positional_args: + body_only_params_in_use = body_only_params.intersection(kwargs) + if body_only_params_in_use: + params_prose = "', '".join(sorted(body_only_params_in_use)) + plural_params = len(body_only_params_in_use) > 1 + + raise TypeError( + f"The '{params_prose}' parameter{'s' if plural_params else ''} " + f"{'are' if plural_params else 'is'} only serialized in the " + f"request body and can't be combined with the 'body' parameter. " + f"Either stop using the 'body' parameter and use keyword-arguments " + f"only or move the specified parameters into the 'body'. " + f"See https://github.com/elastic/elasticsearch-py/issues/1698 " + f"for more information" + ) + + elif set(body_params or ()).intersection(kwargs): + body = {} + for param in body_params: + value = kwargs.pop(param, None) + if value is not None: + body[param.rstrip("_")] = value + kwargs["body"] = body + + elif body_required: + kwargs["body"] = {} + + if body_name: + if body_name in kwargs: + if using_body_kwarg: + raise TypeError( + f"Can't use '{body_name}' and 'body' parameters together" + f" because '{body_name}' is an alias for 'body'. " + f"Instead you should only use the '{body_name}' " + f"parameter. See https://github.com/elastic/elasticsearch-py/issues/1698 " + f"for more information" + ) + kwargs["body"] = kwargs.pop(body_name) + + if http_auth is not None and api_key is not None: + raise ValueError("Only one of 'http_auth' and 'api_key' may be passed at a time") + elif http_auth is not None: + headers["authorization"] = f"Basic {_base64_auth_header(http_auth)}" + elif api_key is not None: + headers["authorization"] = f"ApiKey {_base64_auth_header(api_key)}" + + for p in es_query_params + GLOBAL_PARAMS: + if p in kwargs: + v = kwargs.pop(p) + if v is not None: + params[p] = _escape(v) + + for p in ("ignore", "request_timeout"): + if p in kwargs: + params[p] = kwargs.pop(p) + return func(*args, params=params, headers=headers, **kwargs) + + return _wrapped + + return _wrapper + + +def to_str(x, encoding="ascii"): + if not isinstance(x, str): + return x.decode(encoding) + return x + + +def to_bytes(x, encoding="ascii"): + if not isinstance(x, bytes): + return x.encode(encoding) + return x + + +def _base64_auth_header(auth_value): + """Takes either a 2-tuple or a base64-encoded string + and returns a base64-encoded string to be used + as an HTTP authorization header. + """ + if isinstance(auth_value, (list, tuple)): + auth_value = base64.b64encode(to_bytes(":".join(auth_value))) + return to_str(auth_value) + + +def _escape(value): + """ + Escape a single value of a URL string or a query parameter. If it is a list + or tuple, turn it into a comma-separated string first. + """ + + # make sequences into comma-separated strings + if isinstance(value, (list, tuple)): + value = ",".join(value) + + # dates and datetimes into isoformat + elif isinstance(value, (date, datetime)): + value = value.isoformat() + + # make bools into true/false strings + elif isinstance(value, bool): + value = str(value).lower() + + # don't decode bytestrings + elif isinstance(value, bytes): + return value + + # encode strings to utf-8 + if isinstance(value, str): + return value.encode("utf-8") + + return str(value) + + +class MissingIndexException(NotFoundError): + """Exception representing a missing index.""" + + def __init__(self, msg, body): + self.msg = msg + self.body = body + + def __str__(self): + return f"IndexMissingException[[{self.msg}] missing] with body {self.body}" + + +class SearchFailedException(NotFoundError): + """Exception representing a search failure.""" + + def __init__(self, msg): + self.msg = msg + + def __str__(self): + return f"SearchFailedException: {self.msg}" diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index 93137ff2b2d6..7ae894f22a94 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -29,7 +29,6 @@ import elasticsearch import pendulum import pytest -from elasticsearch.exceptions import ElasticsearchException from airflow.configuration import conf from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse @@ -40,6 +39,7 @@ from tests.test_utils.db import clear_db_dags, clear_db_runs from .elasticmock import elasticmock +from .elasticmock.utilities import SearchFailedException def get_ti(dag_id, task_id, execution_date, create_task_instance): @@ -94,7 +94,7 @@ def setup_method(self, method): offset_field=self.offset_field, ) - self.es = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", "port": 9200}]) + self.es = elasticsearch.Elasticsearch("http://localhost:9200") self.index_name = "test_index" self.doc_type = "log" self.test_message = "some random stuff" @@ -132,7 +132,7 @@ def test_client(self): def test_client_with_config(self): es_conf = dict(conf.getsection("elasticsearch_configs")) expected_dict = { - "use_ssl": False, + "http_compress": False, "verify_certs": True, } assert es_conf == expected_dict @@ -210,7 +210,7 @@ def test_read_with_patterns_no_match(self, ti): def test_read_with_missing_index(self, ti): ts = pendulum.now() with mock.patch.object(self.es_task_handler, "index_patterns", new="nonexistent,test_*"): - with pytest.raises(elasticsearch.exceptions.NotFoundError, match=r".*nonexistent.*"): + with pytest.raises(elasticsearch.exceptions.NotFoundError, match=r"IndexMissingException.*"): self.es_task_handler.read( ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) @@ -365,7 +365,7 @@ def test_read_as_download_logs(self, ti): def test_read_raises(self, ti): with mock.patch.object(self.es_task_handler.log, "exception") as mock_exception: with mock.patch.object(self.es_task_handler.client, "search") as mock_execute: - mock_execute.side_effect = ElasticsearchException("Failed to read") + mock_execute.side_effect = SearchFailedException("Failed to read") logs, metadatas = self.es_task_handler.read(ti, 1) assert mock_exception.call_count == 1 args, kwargs = mock_exception.call_args