Skip to content

Commit

Permalink
feat: Add timeout to datasources (#3598)
Browse files Browse the repository at this point in the history
* Added timeout using signals in the datasource invoke function, when
  the timeout is hit it raises a TimeoutException.
* Added code in the dehydrate to properly dehydrate caught exceptions,
  and simplified it to be more readable.

Signed-off-by: Ryan Blakley <[email protected]>

Signed-off-by: Ryan Blakley <[email protected]>
  • Loading branch information
ryan-blakley authored Dec 1, 2022
1 parent 4b8f720 commit 4d3a4a0
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 20 deletions.
33 changes: 33 additions & 0 deletions insights/core/dr.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,39 @@ def get_at_least_one(comp):
return get_requires(component) + get_at_least_one(component)


def is_registry_point(component):
return type(component).__name__ == "RegistryPoint"


def get_registry_points(component):
"""
Loop through the dependency graph to identify the corresponding spec registry
points for the component. This is primarily used by datasources and returns a
`set`. In most cases only one registry point will be included in the set, but
in some cases more than one.
Args:
component (callable): The component object
Returns:
(set): A list of the registry points found.
"""
reg_points = set()

if is_registry_point(component):
reg_points.add(component)
else:
for dep in get_dependents(component):
if is_registry_point(dep):
reg_points.add(dep)
else:
dep_reg_pts = get_registry_points(dep)
if dep_reg_pts:
reg_points.update(dep_reg_pts)

return reg_points


def get_subgraphs(graph=None):
"""
Given a graph of possibly disconnected components, generate all graphs of
Expand Down
33 changes: 30 additions & 3 deletions insights/core/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from __future__ import print_function

import logging
import signal
import traceback

from pprint import pformat
Expand All @@ -44,6 +45,11 @@ class ContentException(dr.SkipComponent):
pass


class TimeoutException(Exception):
""" Raised whenever a :class:`datasource` hits the set timeout value. """
pass


class PluginType(dr.ComponentType):
"""
PluginType is the base class of plugin types like datasource, rule, etc.
Expand Down Expand Up @@ -81,21 +87,42 @@ class datasource(PluginType):
Decorates a component that one or more :class:`insights.core.Parser`
subclasses will consume.
"""
filterable = False
multi_output = False
raw = False
filterable = False

def _handle_timeout(self, signum, frame):
raise TimeoutException("Datasource spec {ds_name} timed out after {secs} seconds!".format(
ds_name=dr.get_name(self.component), secs=self.timeout))

def invoke(self, broker):
# Grab the timeout from the decorator, or use the default of 120.
self.timeout = getattr(self, "timeout", 120)

signal.signal(signal.SIGALRM, self._handle_timeout)
signal.alarm(self.timeout)
try:
return self.component(broker)
except ContentException as ce:
log.debug(ce)
broker.add_exception(self.component, ce, traceback.format_exc())
ce_tb = traceback.format_exc()
for reg_spec in dr.get_registry_points(self.component):
broker.add_exception(reg_spec, ce, ce_tb)
raise dr.SkipComponent()
except CalledProcessError as cpe:
log.debug(cpe)
broker.add_exception(self.component, cpe, traceback.format_exc())
cpe_tb = traceback.format_exc()
for reg_spec in dr.get_registry_points(self.component):
broker.add_exception(reg_spec, cpe, cpe_tb)
raise dr.SkipComponent()
except TimeoutException as te:
log.debug(te)
te_tb = traceback.format_exc()
for reg_spec in dr.get_registry_points(self.component):
broker.add_exception(reg_spec, te, te_tb)
raise dr.SkipComponent()
finally:
signal.alarm(0)


class parser(PluginType):
Expand Down
28 changes: 15 additions & 13 deletions insights/core/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,27 +196,29 @@ def dehydrate(self, comp, broker):
fs.ensure_path(self.data, mode=0o770)
self.created = True

c = comp
doc = None
try:
name = dr.get_name(c)
# The `broker.trackbacks` is a dict in which the values are string
# but not list of strings
errors = [broker.tracebacks[e] for e in broker.exceptions.get(c, [])]
doc = {
"name": name,
"exec_time": broker.exec_times.get(c),
"errors": errors
}
name = dr.get_name(comp)

# The `broker.tracebacks` is a dict in which the values are string
# but not list of strings.
errors = [broker.tracebacks[e] for e in broker.exceptions.get(comp, [])]

start = time.time()
results, ms_errors = marshal(comp, broker, root=self.data, pool=self.pool)
doc["results"] = results if results else None
errors.extend(ms_errors if isinstance(ms_errors, list) else [ms_errors]) if ms_errors else None
doc["ser_time"] = time.time() - start

doc = {
"name": name,
"exec_time": broker.exec_times.get(comp),
"errors": errors,
"results": results if results else None,
"ser_time": time.time() - start
}
except Exception as ex:
log.exception(ex)
else:
if doc is not None and (doc["results"] or doc["errors"]):
path = None
try:
path = os.path.join(self.meta_data, name + "." + self.ser_name)
with open(path, "w") as f:
Expand Down
2 changes: 1 addition & 1 deletion insights/specs/datasources/container/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from insights.parsers.docker_list import DockerListContainers


@datasource([PodmanListContainers, DockerListContainers], HostContext)
@datasource([PodmanListContainers, DockerListContainers], HostContext, timeout=240)
def running_rhel_containers(broker):
"""
Returns a list of tuple of (image, <podman|docker>, container_id) of the running
Expand Down
54 changes: 54 additions & 0 deletions insights/tests/core/test_dr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from insights.core import dr
from insights.core.plugins import datasource
from insights.core.spec_factory import DatasourceProvider, RegistryPoint, SpecSet


class RegistrySpecs(SpecSet):
simple_spec = RegistryPoint()
first_spec_with_dep = RegistryPoint()
second_spec_with_dep = RegistryPoint()


@datasource()
def simple_spec_imp(broker):
return DatasourceProvider("some data", "/path_1")


@datasource()
def dependency_ds(broker):
return DatasourceProvider("dependency data", "/path_2")


@datasource(dependency_ds)
def first_spec_with_dep_imp(broker):
return DatasourceProvider("some data", "/path_3")


@datasource(dependency_ds)
def second_spec_with_dep_imp(broker):
return DatasourceProvider("some data", "/path_4")


class DefaultSpecs(RegistrySpecs):
simple_spec = simple_spec_imp
first_spec_with_dep = first_spec_with_dep_imp
second_spec_with_dep = second_spec_with_dep_imp


def test_is_registry_point():
assert dr.is_registry_point(RegistrySpecs.simple_spec)
assert not dr.is_registry_point(DefaultSpecs.simple_spec)


def test_get_registry_points_simple():
specs = dr.get_registry_points(DefaultSpecs.simple_spec)
assert len(specs) == 1
spec = list(specs)[0]
assert spec == RegistrySpecs.simple_spec


def test_get_registry_points_multiple_specs():
specs = dr.get_registry_points(dependency_ds)
assert len(specs) == 2
for spec in specs:
assert spec in [RegistrySpecs.first_spec_with_dep, RegistrySpecs.second_spec_with_dep]
80 changes: 80 additions & 0 deletions insights/tests/datasources/test_datasource_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import time

from insights.core.dr import run
from insights.core.plugins import TimeoutException, datasource, make_info, rule
from insights.core.spec_factory import DatasourceProvider, RegistryPoint, SpecSet, foreach_execute


class Specs(SpecSet):
spec_ds_timeout_1 = RegistryPoint()
spec_ds_timeout_2 = RegistryPoint()
spec_ds_timeout_default = RegistryPoint()
spec_foreach_ds_timeout_1 = RegistryPoint(multi_output=True)


@datasource(timeout=1)
def foreach_ds_timeout_1(broker):
time.sleep(2)
return ['test1', 'test2', 'test3']


@datasource(timeout=1)
def ds_timeout_1(broker):
time.sleep(2)
return DatasourceProvider('foo', "test_ds_timeout_1")


@datasource(timeout=3)
def ds_timeout_2(broker):
time.sleep(1)
return DatasourceProvider('foo', "test_ds_timeout_2")


@datasource()
def ds_timeout_default(broker):
time.sleep(1)
return DatasourceProvider('foo', "test_ds_timeout_def")


class TestSpecs(Specs):
spec_ds_timeout_1 = ds_timeout_1
spec_ds_timeout_2 = ds_timeout_2
spec_ds_timeout_default = ds_timeout_default
spec_foreach_ds_timeout_1 = foreach_execute(foreach_ds_timeout_1, "/usr/bin/echo %s")


@rule(Specs.spec_ds_timeout_2, Specs.spec_ds_timeout_default)
def timeout_datasource_no_timeout(ds_to_2, ds_to_def):
return make_info('INFO_1')


@rule(Specs.spec_ds_timeout_1)
def timeout_datasource_hit(ds_to_1):
return make_info('INFO_2')


@rule(Specs.spec_foreach_ds_timeout_1)
def timeout_foreach_datasource_hit(foreach_ds_to_1):
return make_info('INFO_2')


def test_timeout_datasource_no_hit():
broker = run(timeout_datasource_no_timeout)
assert timeout_datasource_no_timeout in broker
assert Specs.spec_ds_timeout_2 not in broker.exceptions


def test_timeout_datasource_hit_def():
broker = run(timeout_datasource_hit)
assert timeout_datasource_hit in broker
assert Specs.spec_ds_timeout_1 in broker.exceptions
exs = broker.exceptions[Specs.spec_ds_timeout_1]
assert [ex for ex in exs if isinstance(ex, TimeoutException) and str(ex) == "Datasource spec insights.tests.datasources.test_datasource_timeout.TestSpecs.spec_ds_timeout_1 timed out after 1 seconds!"]


def test_timeout_foreach_datasource_hit_def():
broker = run(timeout_foreach_datasource_hit)
assert timeout_foreach_datasource_hit in broker
assert Specs.spec_foreach_ds_timeout_1 in broker.exceptions
exs = broker.exceptions[Specs.spec_foreach_ds_timeout_1]
assert [ex for ex in exs if isinstance(ex, TimeoutException) and str(ex) == "Datasource spec insights.tests.datasources.test_datasource_timeout.foreach_ds_timeout_1 timed out after 1 seconds!"]
4 changes: 2 additions & 2 deletions insights/tests/test_add_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def report(dt):
def test_broker_add_exception():
broker = dr.run(report)
assert report in broker
assert TestSpecs.the_data in broker.exceptions
spec_exs = broker.exceptions[TestSpecs.the_data]
assert Specs.the_data in broker.exceptions
spec_exs = broker.exceptions[Specs.the_data]
exs = [ex for ex in spec_exs if isinstance(ex, ContentException) and str(ex) == "Fake Datasource"]
assert len(exs) == 1
tb = broker.tracebacks[exs[0]]
Expand Down
2 changes: 1 addition & 1 deletion insights/tests/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def test_dehydrate():
tb = broker.tracebacks[exc]

tmp_path = mkdtemp()
spec_the_data = TestSpecs.the_data
spec_the_data = Specs.the_data
try:
h = Hydration(tmp_path)
h.dehydrate(spec_the_data, broker)
Expand Down

0 comments on commit 4d3a4a0

Please sign in to comment.