diff --git a/panoply/__init__.py b/panoply/__init__.py index a2ae238..4587a3f 100644 --- a/panoply/__init__.py +++ b/panoply/__init__.py @@ -1,5 +1,33 @@ +import logging +from sys import stdout +from traceback import print_exception as _print_exception + from .datasource import * from .records import * from .resources import * from .sdk import * from .ssh import SSHTunnel + +logging.basicConfig(stream=stdout, format='%(levelname)s: %(message)s') + + +def custom_excepthook(args): + """ + Handle uncaught Thread.run() exception + and print error text to STDOUT instead of STDERR. + + "It's always assumed that + the runnner is single-threaded and synchronuous such that `result` events + are only assigned to the last executed request". + see https://github.com/panoplyio/legacy-source-wrapper/blob/master/src/sources-runner/index.js#L74 + """ + if args.exc_type == SystemExit: + # silently ignore SystemExit + return + + logging.error("Caught an exception in thread:") + _print_exception(args.exc_type, args.exc_value, args.exc_traceback, + file=stdout) + + +threading.excepthook = custom_excepthook diff --git a/panoply/constants.py b/panoply/constants.py index a4b0527..a54d744 100644 --- a/panoply/constants.py +++ b/panoply/constants.py @@ -1,2 +1,2 @@ -__version__ = "3.1.4" +__version__ = "3.2.0" __package_name__ = "panoply-python-sdk" diff --git a/panoply/datasource.py b/panoply/datasource.py index 81d2ae2..6fe8439 100644 --- a/panoply/datasource.py +++ b/panoply/datasource.py @@ -1,10 +1,12 @@ import base64 +import concurrent.futures.thread import traceback -from abc import abstractmethod, ABCMeta +from abc import ABCMeta, abstractmethod from concurrent.futures import ThreadPoolExecutor from functools import wraps from threading import Event -from typing import List, Dict, Union +from time import time +from typing import Dict, List, Union import backoff import requests @@ -182,7 +184,7 @@ def wrapper(*args, **kwargs): return _validate_token -def background_progress(message, waiting_interval=10 * 60): +def background_progress(message, waiting_interval=10 * 60, timeout=24*60*60): """ A decorator is used to emit progress while long operation is executed. For example, for database's data sources such operations might be declaration of the cursor or counting number of rows. @@ -196,6 +198,9 @@ def background_progress(message, waiting_interval=10 * 60): waiting_interval : float Time in seconds to wait between progress emitting. Defaults to 10 minutes + timeout : float + Time in seconds for maximum progress emiting time. + Defaults to no 24 hours """ def _background_progress(func): @@ -203,12 +208,19 @@ def _background_progress(func): def wrapper(*args, **kwargs): self = args[0] self.log('Creating background progress emitter') + self.log(f'Timeout is set to {timeout} seconds') finished = Event() + started_at = time() with ThreadPoolExecutor(max_workers=1) as executor: func_future = executor.submit(func, *args, **kwargs) func_future.add_done_callback(lambda future: finished.set()) while not func_future.done(): + if (time() - started_at) > timeout: + self.log("Max waiting time exceeded. Clearing threads.") + executor._threads.clear() + concurrent.futures.thread._threads_queues.clear() + raise Exception("Max waiting time exceeded") self.log(message) self.progress(None, None, message) finished.wait(timeout=waiting_interval) diff --git a/panoply/resources.py b/panoply/resources.py index eb651ff..f939a64 100644 --- a/panoply/resources.py +++ b/panoply/resources.py @@ -1,4 +1,4 @@ -from typing import Optional, List, TypedDict, Dict +from typing import List, Optional, TypedDict class Field(TypedDict): diff --git a/panoply/sdk.py b/panoply/sdk.py index d9dc250..0f81b74 100644 --- a/panoply/sdk.py +++ b/panoply/sdk.py @@ -1,16 +1,15 @@ import base64 import json +import queue +import threading import time -import urllib.request import urllib.error import urllib.parse -import threading -import queue -import logging +import urllib.request from copy import copy -from .constants import __package_name__, __version__ from . import events +from .constants import __package_name__, __version__ MAXSIZE = 1024 * 250 # 250kib FLUSH_TIMEOUT = 2.0 # 2 seconds diff --git a/panoply/ssh.py b/panoply/ssh.py index 1c843ac..b714391 100644 --- a/panoply/ssh.py +++ b/panoply/ssh.py @@ -1,14 +1,23 @@ """ Module for storing SSH related stuff """ +import logging from typing import Dict from paramiko import RSAKey, Ed25519Key, SSHException from sshtunnel import SSHTunnelForwarder from io import StringIO +from sys import stdout from .errors import IncorrectParamError +def get_stdout_only_logger(): + logger = logging.getLogger("STDOUTONLY") + stream_handler = logging.StreamHandler(stream=stdout) + logger.addHandler(stream_handler) + return logger + + class SSHTunnel: """ General SSH tunnel class-component @@ -144,7 +153,8 @@ def _get_server(self, platform_ssh: bool): ssh_username=self.tunnel["username"], ssh_password=self.tunnel.get("password"), ssh_pkey=pkey, - remote_bind_address=(self.host, self.port) + remote_bind_address=(self.host, self.port), + logger=get_stdout_only_logger() ) server.start()