From 45721e80f5701ce62c9cdba820c160e943f495af Mon Sep 17 00:00:00 2001 From: Ryan Blakley Date: Wed, 30 Nov 2022 19:58:33 -0500 Subject: [PATCH] feat: Add timeout to datasources (#3598) * Added timeout using signals in the datasource invoke function, when the timeout is hit it raises a TimeoutException. * Added code in the dehydrate to properly dehydrate caught exceptions, and simplified it to be more readable. Signed-off-by: Ryan Blakley Signed-off-by: Ryan Blakley (cherry picked from commit ff98e30cd0b8ddbceae3dcc49b8c8b439d31a69e) --- insights/core/dr.py | 33 ++++++++ insights/core/plugins.py | 33 +++++++- insights/core/serde.py | 28 ++++--- .../specs/datasources/container/__init__.py | 2 +- insights/tests/core/test_dr.py | 54 +++++++++++++ .../datasources/test_datasource_timeout.py | 80 +++++++++++++++++++ insights/tests/test_add_exception.py | 4 +- insights/tests/test_serde.py | 2 +- 8 files changed, 216 insertions(+), 20 deletions(-) create mode 100644 insights/tests/core/test_dr.py create mode 100644 insights/tests/datasources/test_datasource_timeout.py diff --git a/insights/core/dr.py b/insights/core/dr.py index fbff5a6fdf..664c6811b4 100644 --- a/insights/core/dr.py +++ b/insights/core/dr.py @@ -424,6 +424,39 @@ def get_at_least_one(comp): return get_requires(component) + get_at_least_one(component) +def is_registry_point(component): + return type(component).__name__ == "RegistryPoint" + + +def get_registry_points(component): + """ + Loop through the dependency graph to identify the corresponding spec registry + points for the component. This is primarily used by datasources and returns a + `set`. In most cases only one registry point will be included in the set, but + in some cases more than one. + + Args: + component (callable): The component object + + Returns: + (set): A list of the registry points found. + """ + reg_points = set() + + if is_registry_point(component): + reg_points.add(component) + else: + for dep in get_dependents(component): + if is_registry_point(dep): + reg_points.add(dep) + else: + dep_reg_pts = get_registry_points(dep) + if dep_reg_pts: + reg_points.update(dep_reg_pts) + + return reg_points + + def get_subgraphs(graph=None): """ Given a graph of possibly disconnected components, generate all graphs of diff --git a/insights/core/plugins.py b/insights/core/plugins.py index 92815a8948..2d9c838582 100644 --- a/insights/core/plugins.py +++ b/insights/core/plugins.py @@ -27,6 +27,7 @@ from __future__ import print_function import logging +import signal import traceback from pprint import pformat @@ -44,6 +45,11 @@ class ContentException(dr.SkipComponent): pass +class TimeoutException(Exception): + """ Raised whenever a :class:`datasource` hits the set timeout value. """ + pass + + class PluginType(dr.ComponentType): """ PluginType is the base class of plugin types like datasource, rule, etc. @@ -81,21 +87,42 @@ class datasource(PluginType): Decorates a component that one or more :class:`insights.core.Parser` subclasses will consume. """ + filterable = False multi_output = False raw = False - filterable = False + + def _handle_timeout(self, signum, frame): + raise TimeoutException("Datasource spec {ds_name} timed out after {secs} seconds!".format( + ds_name=dr.get_name(self.component), secs=self.timeout)) def invoke(self, broker): + # Grab the timeout from the decorator, or use the default of 120. + self.timeout = getattr(self, "timeout", 120) + + signal.signal(signal.SIGALRM, self._handle_timeout) + signal.alarm(self.timeout) try: return self.component(broker) except ContentException as ce: log.debug(ce) - broker.add_exception(self.component, ce, traceback.format_exc()) + ce_tb = traceback.format_exc() + for reg_spec in dr.get_registry_points(self.component): + broker.add_exception(reg_spec, ce, ce_tb) raise dr.SkipComponent() except CalledProcessError as cpe: log.debug(cpe) - broker.add_exception(self.component, cpe, traceback.format_exc()) + cpe_tb = traceback.format_exc() + for reg_spec in dr.get_registry_points(self.component): + broker.add_exception(reg_spec, cpe, cpe_tb) + raise dr.SkipComponent() + except TimeoutException as te: + log.debug(te) + te_tb = traceback.format_exc() + for reg_spec in dr.get_registry_points(self.component): + broker.add_exception(reg_spec, te, te_tb) raise dr.SkipComponent() + finally: + signal.alarm(0) class parser(PluginType): diff --git a/insights/core/serde.py b/insights/core/serde.py index d068da6717..9c0138aa5d 100644 --- a/insights/core/serde.py +++ b/insights/core/serde.py @@ -196,27 +196,29 @@ def dehydrate(self, comp, broker): fs.ensure_path(self.data, mode=0o770) self.created = True - c = comp - doc = None try: - name = dr.get_name(c) - # The `broker.trackbacks` is a dict in which the values are string - # but not list of strings - errors = [broker.tracebacks[e] for e in broker.exceptions.get(c, [])] - doc = { - "name": name, - "exec_time": broker.exec_times.get(c), - "errors": errors - } + name = dr.get_name(comp) + + # The `broker.tracebacks` is a dict in which the values are string + # but not list of strings. + errors = [broker.tracebacks[e] for e in broker.exceptions.get(comp, [])] + start = time.time() results, ms_errors = marshal(comp, broker, root=self.data, pool=self.pool) - doc["results"] = results if results else None errors.extend(ms_errors if isinstance(ms_errors, list) else [ms_errors]) if ms_errors else None - doc["ser_time"] = time.time() - start + + doc = { + "name": name, + "exec_time": broker.exec_times.get(comp), + "errors": errors, + "results": results if results else None, + "ser_time": time.time() - start + } except Exception as ex: log.exception(ex) else: if doc is not None and (doc["results"] or doc["errors"]): + path = None try: path = os.path.join(self.meta_data, name + "." + self.ser_name) with open(path, "w") as f: diff --git a/insights/specs/datasources/container/__init__.py b/insights/specs/datasources/container/__init__.py index c3481cd82d..b69ef62de0 100644 --- a/insights/specs/datasources/container/__init__.py +++ b/insights/specs/datasources/container/__init__.py @@ -9,7 +9,7 @@ from insights.parsers.docker_list import DockerListContainers -@datasource([PodmanListContainers, DockerListContainers], HostContext) +@datasource([PodmanListContainers, DockerListContainers], HostContext, timeout=240) def running_rhel_containers(broker): """ Returns a list of tuple of (image, , container_id) of the running diff --git a/insights/tests/core/test_dr.py b/insights/tests/core/test_dr.py new file mode 100644 index 0000000000..605906e39a --- /dev/null +++ b/insights/tests/core/test_dr.py @@ -0,0 +1,54 @@ +from insights.core import dr +from insights.core.plugins import datasource +from insights.core.spec_factory import DatasourceProvider, RegistryPoint, SpecSet + + +class RegistrySpecs(SpecSet): + simple_spec = RegistryPoint() + first_spec_with_dep = RegistryPoint() + second_spec_with_dep = RegistryPoint() + + +@datasource() +def simple_spec_imp(broker): + return DatasourceProvider("some data", "/path_1") + + +@datasource() +def dependency_ds(broker): + return DatasourceProvider("dependency data", "/path_2") + + +@datasource(dependency_ds) +def first_spec_with_dep_imp(broker): + return DatasourceProvider("some data", "/path_3") + + +@datasource(dependency_ds) +def second_spec_with_dep_imp(broker): + return DatasourceProvider("some data", "/path_4") + + +class DefaultSpecs(RegistrySpecs): + simple_spec = simple_spec_imp + first_spec_with_dep = first_spec_with_dep_imp + second_spec_with_dep = second_spec_with_dep_imp + + +def test_is_registry_point(): + assert dr.is_registry_point(RegistrySpecs.simple_spec) + assert not dr.is_registry_point(DefaultSpecs.simple_spec) + + +def test_get_registry_points_simple(): + specs = dr.get_registry_points(DefaultSpecs.simple_spec) + assert len(specs) == 1 + spec = list(specs)[0] + assert spec == RegistrySpecs.simple_spec + + +def test_get_registry_points_multiple_specs(): + specs = dr.get_registry_points(dependency_ds) + assert len(specs) == 2 + for spec in specs: + assert spec in [RegistrySpecs.first_spec_with_dep, RegistrySpecs.second_spec_with_dep] diff --git a/insights/tests/datasources/test_datasource_timeout.py b/insights/tests/datasources/test_datasource_timeout.py new file mode 100644 index 0000000000..55b503ffaf --- /dev/null +++ b/insights/tests/datasources/test_datasource_timeout.py @@ -0,0 +1,80 @@ +import time + +from insights.core.dr import run +from insights.core.plugins import TimeoutException, datasource, make_info, rule +from insights.core.spec_factory import DatasourceProvider, RegistryPoint, SpecSet, foreach_execute + + +class Specs(SpecSet): + spec_ds_timeout_1 = RegistryPoint() + spec_ds_timeout_2 = RegistryPoint() + spec_ds_timeout_default = RegistryPoint() + spec_foreach_ds_timeout_1 = RegistryPoint(multi_output=True) + + +@datasource(timeout=1) +def foreach_ds_timeout_1(broker): + time.sleep(2) + return ['test1', 'test2', 'test3'] + + +@datasource(timeout=1) +def ds_timeout_1(broker): + time.sleep(2) + return DatasourceProvider('foo', "test_ds_timeout_1") + + +@datasource(timeout=3) +def ds_timeout_2(broker): + time.sleep(1) + return DatasourceProvider('foo', "test_ds_timeout_2") + + +@datasource() +def ds_timeout_default(broker): + time.sleep(1) + return DatasourceProvider('foo', "test_ds_timeout_def") + + +class TestSpecs(Specs): + spec_ds_timeout_1 = ds_timeout_1 + spec_ds_timeout_2 = ds_timeout_2 + spec_ds_timeout_default = ds_timeout_default + spec_foreach_ds_timeout_1 = foreach_execute(foreach_ds_timeout_1, "/usr/bin/echo %s") + + +@rule(Specs.spec_ds_timeout_2, Specs.spec_ds_timeout_default) +def timeout_datasource_no_timeout(ds_to_2, ds_to_def): + return make_info('INFO_1') + + +@rule(Specs.spec_ds_timeout_1) +def timeout_datasource_hit(ds_to_1): + return make_info('INFO_2') + + +@rule(Specs.spec_foreach_ds_timeout_1) +def timeout_foreach_datasource_hit(foreach_ds_to_1): + return make_info('INFO_2') + + +def test_timeout_datasource_no_hit(): + broker = run(timeout_datasource_no_timeout) + assert timeout_datasource_no_timeout in broker + assert Specs.spec_ds_timeout_2 not in broker.exceptions + + +def test_timeout_datasource_hit_def(): + broker = run(timeout_datasource_hit) + assert timeout_datasource_hit in broker + assert Specs.spec_ds_timeout_1 in broker.exceptions + exs = broker.exceptions[Specs.spec_ds_timeout_1] + assert [ex for ex in exs if isinstance(ex, TimeoutException) and str(ex) == "Datasource spec insights.tests.datasources.test_datasource_timeout.TestSpecs.spec_ds_timeout_1 timed out after 1 seconds!"] + + +def test_timeout_foreach_datasource_hit_def(): + broker = run(timeout_foreach_datasource_hit) + assert timeout_foreach_datasource_hit in broker + assert Specs.spec_foreach_ds_timeout_1 in broker.exceptions + exs = broker.exceptions[Specs.spec_foreach_ds_timeout_1] + assert [ex for ex in exs if isinstance(ex, TimeoutException) and str(ex) == "Datasource spec insights.tests.datasources.test_datasource_timeout.foreach_ds_timeout_1 timed out after 1 seconds!"] diff --git a/insights/tests/test_add_exception.py b/insights/tests/test_add_exception.py index 7abac0433a..a09441bef1 100644 --- a/insights/tests/test_add_exception.py +++ b/insights/tests/test_add_exception.py @@ -21,8 +21,8 @@ def report(dt): def test_broker_add_exception(): broker = dr.run(report) assert report in broker - assert TestSpecs.the_data in broker.exceptions - spec_exs = broker.exceptions[TestSpecs.the_data] + assert Specs.the_data in broker.exceptions + spec_exs = broker.exceptions[Specs.the_data] exs = [ex for ex in spec_exs if isinstance(ex, ContentException) and str(ex) == "Fake Datasource"] assert len(exs) == 1 tb = broker.tracebacks[exs[0]] diff --git a/insights/tests/test_serde.py b/insights/tests/test_serde.py index 9ebd60e462..4557f850b0 100644 --- a/insights/tests/test_serde.py +++ b/insights/tests/test_serde.py @@ -260,7 +260,7 @@ def test_dehydrate(): tb = broker.tracebacks[exc] tmp_path = mkdtemp() - spec_the_data = TestSpecs.the_data + spec_the_data = Specs.the_data try: h = Hydration(tmp_path) h.dehydrate(spec_the_data, broker)