diff --git a/panoply/constants.py b/panoply/constants.py index a7931f6..89e1072 100644 --- a/panoply/constants.py +++ b/panoply/constants.py @@ -1,2 +1,2 @@ -__version__ = "2.0.15" +__version__ = "2.1.0" __package_name__ = "panoply-python-sdk" diff --git a/panoply/datasource.py b/panoply/datasource.py index 5e95d65..50a9119 100644 --- a/panoply/datasource.py +++ b/panoply/datasource.py @@ -1,8 +1,10 @@ import base64 import traceback +import concurrent.futures.thread from concurrent.futures import ThreadPoolExecutor from functools import wraps from threading import Event +from time import time import backoff import requests @@ -153,7 +155,7 @@ def wrapper(*args, **kwargs): return _validate_token -def background_progress(message, waiting_interval=10 * 60): +def background_progress(message, waiting_interval=10 * 60, timeout=24*60*60): """ A decorator is used to emit progress while long operation is executed. For example, for database's data sources such operations might be declaration of the cursor or counting number of rows. @@ -167,6 +169,9 @@ def background_progress(message, waiting_interval=10 * 60): waiting_interval : float Time in seconds to wait between progress emitting. Defaults to 10 minutes + timeout : float + Time in seconds for maximum progress emiting time. + Defaults to no 24 hours """ def _background_progress(func): @@ -174,12 +179,19 @@ def _background_progress(func): def wrapper(*args, **kwargs): self = args[0] self.log('Creating background progress emitter') + self.log(f'Timeout is set to {max_wait} seconds') finished = Event() + started_at = time() with ThreadPoolExecutor(max_workers=1) as executor: func_future = executor.submit(func, *args, **kwargs) func_future.add_done_callback(lambda future: finished.set()) while not func_future.done(): + if (time() - started_at) > timeout: + self.log("Max waiting time exceeded. Clearing threads.") + executor._threads.clear() + concurrent.futures.thread._threads_queues.clear() + raise Exception("Max waiting time exceeded") self.log(message) self.progress(None, None, message) finished.wait(timeout=waiting_interval) @@ -189,3 +201,4 @@ def wrapper(*args, **kwargs): return wrapper return _background_progress +