Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add timeout to datasources #3598

Merged
merged 1 commit into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions insights/core/dr.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,39 @@ def get_at_least_one(comp):
return get_requires(component) + get_at_least_one(component)


def is_registry_point(component):
return type(component).__name__ == "RegistryPoint"


def get_registry_points(component):
"""
Loop through the dependency graph to identify the corresponding spec registry
points for the component. This is primarily used by datasources and returns a
`set`. In most cases only one registry point will be included in the set, but
in some cases more than one.

Args:
component (callable): The component object

Returns:
(set): A list of the registry points found.
"""
reg_points = set()

if is_registry_point(component):
reg_points.add(component)
else:
for dep in get_dependents(component):
if is_registry_point(dep):
reg_points.add(dep)
else:
dep_reg_pts = get_registry_points(dep)
if dep_reg_pts:
reg_points.update(dep_reg_pts)

return reg_points


def get_subgraphs(graph=None):
"""
Given a graph of possibly disconnected components, generate all graphs of
Expand Down
33 changes: 30 additions & 3 deletions insights/core/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from __future__ import print_function

import logging
import signal
import traceback

from pprint import pformat
Expand All @@ -44,6 +45,11 @@ class ContentException(dr.SkipComponent):
pass


class TimeoutException(Exception):
""" Raised whenever a :class:`datasource` hits the set timeout value. """
pass


class PluginType(dr.ComponentType):
"""
PluginType is the base class of plugin types like datasource, rule, etc.
Expand Down Expand Up @@ -81,21 +87,42 @@ class datasource(PluginType):
Decorates a component that one or more :class:`insights.core.Parser`
subclasses will consume.
"""
filterable = False
multi_output = False
raw = False
filterable = False

def _handle_timeout(self, signum, frame):
raise TimeoutException("Datasource spec {ds_name} timed out after {secs} seconds!".format(
ds_name=dr.get_name(self.component), secs=self.timeout))

def invoke(self, broker):
# Grab the timeout from the decorator, or use the default of 120.
self.timeout = getattr(self, "timeout", 120)

signal.signal(signal.SIGALRM, self._handle_timeout)
signal.alarm(self.timeout)
try:
return self.component(broker)
except ContentException as ce:
log.debug(ce)
broker.add_exception(self.component, ce, traceback.format_exc())
ce_tb = traceback.format_exc()
for reg_spec in dr.get_registry_points(self.component):
broker.add_exception(reg_spec, ce, ce_tb)
raise dr.SkipComponent()
except CalledProcessError as cpe:
log.debug(cpe)
broker.add_exception(self.component, cpe, traceback.format_exc())
cpe_tb = traceback.format_exc()
for reg_spec in dr.get_registry_points(self.component):
broker.add_exception(reg_spec, cpe, cpe_tb)
raise dr.SkipComponent()
except TimeoutException as te:
log.debug(te)
te_tb = traceback.format_exc()
for reg_spec in dr.get_registry_points(self.component):
broker.add_exception(reg_spec, te, te_tb)
raise dr.SkipComponent()
finally:
signal.alarm(0)


class parser(PluginType):
Expand Down
28 changes: 15 additions & 13 deletions insights/core/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,27 +196,29 @@ def dehydrate(self, comp, broker):
fs.ensure_path(self.data, mode=0o770)
self.created = True

c = comp
doc = None
try:
name = dr.get_name(c)
# The `broker.trackbacks` is a dict in which the values are string
# but not list of strings
errors = [broker.tracebacks[e] for e in broker.exceptions.get(c, [])]
doc = {
"name": name,
"exec_time": broker.exec_times.get(c),
"errors": errors
}
name = dr.get_name(comp)

# The `broker.tracebacks` is a dict in which the values are string
# but not list of strings.
errors = [broker.tracebacks[e] for e in broker.exceptions.get(comp, [])]

start = time.time()
results, ms_errors = marshal(comp, broker, root=self.data, pool=self.pool)
doc["results"] = results if results else None
errors.extend(ms_errors if isinstance(ms_errors, list) else [ms_errors]) if ms_errors else None
doc["ser_time"] = time.time() - start

doc = {
"name": name,
"exec_time": broker.exec_times.get(comp),
"errors": errors,
"results": results if results else None,
"ser_time": time.time() - start
}
except Exception as ex:
log.exception(ex)
else:
if doc is not None and (doc["results"] or doc["errors"]):
path = None
try:
path = os.path.join(self.meta_data, name + "." + self.ser_name)
with open(path, "w") as f:
Expand Down
2 changes: 1 addition & 1 deletion insights/specs/datasources/container/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from insights.parsers.docker_list import DockerListContainers


@datasource([PodmanListContainers, DockerListContainers], HostContext)
@datasource([PodmanListContainers, DockerListContainers], HostContext, timeout=240)
def running_rhel_containers(broker):
"""
Returns a list of tuple of (image, <podman|docker>, container_id) of the running
Expand Down
54 changes: 54 additions & 0 deletions insights/tests/core/test_dr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from insights.core import dr
from insights.core.plugins import datasource
from insights.core.spec_factory import DatasourceProvider, RegistryPoint, SpecSet


class RegistrySpecs(SpecSet):
simple_spec = RegistryPoint()
first_spec_with_dep = RegistryPoint()
second_spec_with_dep = RegistryPoint()


@datasource()
def simple_spec_imp(broker):
return DatasourceProvider("some data", "/path_1")


@datasource()
def dependency_ds(broker):
return DatasourceProvider("dependency data", "/path_2")


@datasource(dependency_ds)
def first_spec_with_dep_imp(broker):
return DatasourceProvider("some data", "/path_3")


@datasource(dependency_ds)
def second_spec_with_dep_imp(broker):
return DatasourceProvider("some data", "/path_4")


class DefaultSpecs(RegistrySpecs):
simple_spec = simple_spec_imp
first_spec_with_dep = first_spec_with_dep_imp
second_spec_with_dep = second_spec_with_dep_imp


def test_is_registry_point():
assert dr.is_registry_point(RegistrySpecs.simple_spec)
assert not dr.is_registry_point(DefaultSpecs.simple_spec)


def test_get_registry_points_simple():
specs = dr.get_registry_points(DefaultSpecs.simple_spec)
assert len(specs) == 1
spec = list(specs)[0]
assert spec == RegistrySpecs.simple_spec


def test_get_registry_points_multiple_specs():
specs = dr.get_registry_points(dependency_ds)
assert len(specs) == 2
for spec in specs:
assert spec in [RegistrySpecs.first_spec_with_dep, RegistrySpecs.second_spec_with_dep]
80 changes: 80 additions & 0 deletions insights/tests/datasources/test_datasource_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import time

from insights.core.dr import run
from insights.core.plugins import TimeoutException, datasource, make_info, rule
from insights.core.spec_factory import DatasourceProvider, RegistryPoint, SpecSet, foreach_execute


class Specs(SpecSet):
spec_ds_timeout_1 = RegistryPoint()
spec_ds_timeout_2 = RegistryPoint()
spec_ds_timeout_default = RegistryPoint()
spec_foreach_ds_timeout_1 = RegistryPoint(multi_output=True)


@datasource(timeout=1)
def foreach_ds_timeout_1(broker):
time.sleep(2)
return ['test1', 'test2', 'test3']


@datasource(timeout=1)
def ds_timeout_1(broker):
time.sleep(2)
return DatasourceProvider('foo', "test_ds_timeout_1")


@datasource(timeout=3)
def ds_timeout_2(broker):
time.sleep(1)
return DatasourceProvider('foo', "test_ds_timeout_2")


@datasource()
def ds_timeout_default(broker):
time.sleep(1)
return DatasourceProvider('foo', "test_ds_timeout_def")


class TestSpecs(Specs):
spec_ds_timeout_1 = ds_timeout_1
spec_ds_timeout_2 = ds_timeout_2
spec_ds_timeout_default = ds_timeout_default
spec_foreach_ds_timeout_1 = foreach_execute(foreach_ds_timeout_1, "/usr/bin/echo %s")


@rule(Specs.spec_ds_timeout_2, Specs.spec_ds_timeout_default)
def timeout_datasource_no_timeout(ds_to_2, ds_to_def):
return make_info('INFO_1')


@rule(Specs.spec_ds_timeout_1)
def timeout_datasource_hit(ds_to_1):
return make_info('INFO_2')


@rule(Specs.spec_foreach_ds_timeout_1)
def timeout_foreach_datasource_hit(foreach_ds_to_1):
return make_info('INFO_2')


def test_timeout_datasource_no_hit():
broker = run(timeout_datasource_no_timeout)
assert timeout_datasource_no_timeout in broker
assert Specs.spec_ds_timeout_2 not in broker.exceptions


def test_timeout_datasource_hit_def():
broker = run(timeout_datasource_hit)
assert timeout_datasource_hit in broker
assert Specs.spec_ds_timeout_1 in broker.exceptions
exs = broker.exceptions[Specs.spec_ds_timeout_1]
assert [ex for ex in exs if isinstance(ex, TimeoutException) and str(ex) == "Datasource spec insights.tests.datasources.test_datasource_timeout.TestSpecs.spec_ds_timeout_1 timed out after 1 seconds!"]


def test_timeout_foreach_datasource_hit_def():
broker = run(timeout_foreach_datasource_hit)
assert timeout_foreach_datasource_hit in broker
assert Specs.spec_foreach_ds_timeout_1 in broker.exceptions
exs = broker.exceptions[Specs.spec_foreach_ds_timeout_1]
assert [ex for ex in exs if isinstance(ex, TimeoutException) and str(ex) == "Datasource spec insights.tests.datasources.test_datasource_timeout.foreach_ds_timeout_1 timed out after 1 seconds!"]
4 changes: 2 additions & 2 deletions insights/tests/test_add_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def report(dt):
def test_broker_add_exception():
broker = dr.run(report)
assert report in broker
assert TestSpecs.the_data in broker.exceptions
spec_exs = broker.exceptions[TestSpecs.the_data]
assert Specs.the_data in broker.exceptions
spec_exs = broker.exceptions[Specs.the_data]
exs = [ex for ex in spec_exs if isinstance(ex, ContentException) and str(ex) == "Fake Datasource"]
assert len(exs) == 1
tb = broker.tracebacks[exs[0]]
Expand Down
2 changes: 1 addition & 1 deletion insights/tests/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def test_dehydrate():
tb = broker.tracebacks[exc]

tmp_path = mkdtemp()
spec_the_data = TestSpecs.the_data
spec_the_data = Specs.the_data
try:
h = Hydration(tmp_path)
h.dehydrate(spec_the_data, broker)
Expand Down