Skip to content

Commit

Permalink
Merge pull request #51 from MykytaPanoply/ds-3139_bg_progress_timeout
Browse files Browse the repository at this point in the history
[DS-3139] Add timeout to background progress
  • Loading branch information
MykytaPanoply authored Feb 1, 2023
2 parents a55ac07 + 3e539ec commit a0295d4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
2 changes: 1 addition & 1 deletion panoply/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version__ = "2.0.15"
__version__ = "2.1.0"
__package_name__ = "panoply-python-sdk"
15 changes: 14 additions & 1 deletion panoply/datasource.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import base64
import traceback
import concurrent.futures.thread
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from threading import Event
from time import time

import backoff
import requests
Expand Down Expand Up @@ -153,7 +155,7 @@ def wrapper(*args, **kwargs):
return _validate_token


def background_progress(message, waiting_interval=10 * 60):
def background_progress(message, waiting_interval=10 * 60, timeout=24*60*60):
""" A decorator is used to emit progress while long operation is executed.
For example, for database's data sources such operations might be
declaration of the cursor or counting number of rows.
Expand All @@ -167,19 +169,29 @@ def background_progress(message, waiting_interval=10 * 60):
waiting_interval : float
Time in seconds to wait between progress emitting.
Defaults to 10 minutes
timeout : float
Time in seconds for maximum progress emiting time.
Defaults to no 24 hours
"""

def _background_progress(func):
@wraps(func)
def wrapper(*args, **kwargs):
self = args[0]
self.log('Creating background progress emitter')
self.log(f'Timeout is set to {max_wait} seconds')
finished = Event()
started_at = time()
with ThreadPoolExecutor(max_workers=1) as executor:
func_future = executor.submit(func, *args, **kwargs)
func_future.add_done_callback(lambda future: finished.set())

while not func_future.done():
if (time() - started_at) > timeout:
self.log("Max waiting time exceeded. Clearing threads.")
executor._threads.clear()
concurrent.futures.thread._threads_queues.clear()
raise Exception("Max waiting time exceeded")
self.log(message)
self.progress(None, None, message)
finished.wait(timeout=waiting_interval)
Expand All @@ -189,3 +201,4 @@ def wrapper(*args, **kwargs):
return wrapper

return _background_progress

0 comments on commit a0295d4

Please sign in to comment.