Skip to content

Commit

Permalink
Merge pull request #67 from kirill-panoply/synchronize-latest-versions
Browse files Browse the repository at this point in the history
[PAN-2118] Synchronize latest versions
  • Loading branch information
kirill-panoply authored Oct 14, 2024
2 parents bd63ee7 + 74c8ed8 commit 2db2898
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 11 deletions.
28 changes: 28 additions & 0 deletions panoply/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
import logging
from sys import stdout
from traceback import print_exception as _print_exception

from .datasource import *
from .records import *
from .resources import *
from .sdk import *
from .ssh import SSHTunnel

logging.basicConfig(stream=stdout, format='%(levelname)s: %(message)s')


def custom_excepthook(args):
"""
Handle uncaught Thread.run() exception
and print error text to STDOUT instead of STDERR.
"It's always assumed that
the runnner is single-threaded and synchronuous such that `result` events
are only assigned to the last executed request".
see https://github.com/panoplyio/legacy-source-wrapper/blob/master/src/sources-runner/index.js#L74
"""
if args.exc_type == SystemExit:
# silently ignore SystemExit
return

logging.error("Caught an exception in thread:")
_print_exception(args.exc_type, args.exc_value, args.exc_traceback,
file=stdout)


threading.excepthook = custom_excepthook
2 changes: 1 addition & 1 deletion panoply/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version__ = "3.1.4"
__version__ = "3.2.0"
__package_name__ = "panoply-python-sdk"
18 changes: 15 additions & 3 deletions panoply/datasource.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import base64
import concurrent.futures.thread
import traceback
from abc import abstractmethod, ABCMeta
from abc import ABCMeta, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from threading import Event
from typing import List, Dict, Union
from time import time
from typing import Dict, List, Union

import backoff
import requests
Expand Down Expand Up @@ -182,7 +184,7 @@ def wrapper(*args, **kwargs):
return _validate_token


def background_progress(message, waiting_interval=10 * 60):
def background_progress(message, waiting_interval=10 * 60, timeout=24*60*60):
""" A decorator is used to emit progress while long operation is executed.
For example, for database's data sources such operations might be
declaration of the cursor or counting number of rows.
Expand All @@ -196,19 +198,29 @@ def background_progress(message, waiting_interval=10 * 60):
waiting_interval : float
Time in seconds to wait between progress emitting.
Defaults to 10 minutes
timeout : float
Time in seconds for maximum progress emiting time.
Defaults to no 24 hours
"""

def _background_progress(func):
@wraps(func)
def wrapper(*args, **kwargs):
self = args[0]
self.log('Creating background progress emitter')
self.log(f'Timeout is set to {timeout} seconds')
finished = Event()
started_at = time()
with ThreadPoolExecutor(max_workers=1) as executor:
func_future = executor.submit(func, *args, **kwargs)
func_future.add_done_callback(lambda future: finished.set())

while not func_future.done():
if (time() - started_at) > timeout:
self.log("Max waiting time exceeded. Clearing threads.")
executor._threads.clear()
concurrent.futures.thread._threads_queues.clear()
raise Exception("Max waiting time exceeded")
self.log(message)
self.progress(None, None, message)
finished.wait(timeout=waiting_interval)
Expand Down
2 changes: 1 addition & 1 deletion panoply/resources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, List, TypedDict, Dict
from typing import List, Optional, TypedDict


class Field(TypedDict):
Expand Down
9 changes: 4 additions & 5 deletions panoply/sdk.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import base64
import json
import queue
import threading
import time
import urllib.request
import urllib.error
import urllib.parse
import threading
import queue
import logging
import urllib.request
from copy import copy
from .constants import __package_name__, __version__

from . import events
from .constants import __package_name__, __version__

MAXSIZE = 1024 * 250 # 250kib
FLUSH_TIMEOUT = 2.0 # 2 seconds
Expand Down
12 changes: 11 additions & 1 deletion panoply/ssh.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
"""
Module for storing SSH related stuff
"""
import logging
from typing import Dict
from paramiko import RSAKey, Ed25519Key, SSHException
from sshtunnel import SSHTunnelForwarder
from io import StringIO
from sys import stdout

from .errors import IncorrectParamError


def get_stdout_only_logger():
logger = logging.getLogger("STDOUTONLY")
stream_handler = logging.StreamHandler(stream=stdout)
logger.addHandler(stream_handler)
return logger


class SSHTunnel:
"""
General SSH tunnel class-component
Expand Down Expand Up @@ -144,7 +153,8 @@ def _get_server(self, platform_ssh: bool):
ssh_username=self.tunnel["username"],
ssh_password=self.tunnel.get("password"),
ssh_pkey=pkey,
remote_bind_address=(self.host, self.port)
remote_bind_address=(self.host, self.port),
logger=get_stdout_only_logger()
)
server.start()

Expand Down

0 comments on commit 2db2898

Please sign in to comment.