From 782344d8928b26053818cf29d61b380508095b50 Mon Sep 17 00:00:00 2001 From: Xiangce Liu Date: Wed, 8 Jun 2022 11:09:58 +0800 Subject: [PATCH] feat: Support parallelly running for insights-engine (#3436) Signed-off-by: Xiangce Liu (cherry picked from commit bbabfae7ecc5ec8802abb5cb4b23dfe0c40ea2ac) --- insights/core/evaluators.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/insights/core/evaluators.py b/insights/core/evaluators.py index 43b43c4be4..94eee05460 100644 --- a/insights/core/evaluators.py +++ b/insights/core/evaluators.py @@ -53,9 +53,13 @@ def preprocess(self): def run_serial(self, graph=None): dr.run(graph or dr.COMPONENTS[dr.GROUPS.single], broker=self.broker) - def run_incremental(self, graph=None): - for _ in dr.run_incremental(graph or dr.COMPONENTS[dr.GROUPS.single], broker=self.broker): - pass + def run_incremental(self, graph=None, parallel=False): + components = graph or dr.COMPONENTS[dr.GROUPS.single] + if parallel: + with insights.get_pool(parallel, "insights-engine-pool", {"max_workers": None}) as pool: + dr.run_all(components, self.broker, pool) + else: + dr.run_all(components, self.broker) def format_response(self, response): """ @@ -70,10 +74,10 @@ def format_result(self, result): """ return result - def process(self, graph=None): + def process(self, graph=None, parallel=False): with self: if self.incremental: - self.run_incremental(graph) + self.run_incremental(graph, parallel) else: self.run_serial(graph) return self.get_response()