diff --git a/.github/workflows/python-linting.yml b/.github/workflows/python-linting.yml index 25ec8a51f..0e953752b 100644 --- a/.github/workflows/python-linting.yml +++ b/.github/workflows/python-linting.yml @@ -1,13 +1,6 @@ name: Python Linting on: - push: - branches: - - master - paths: - - 'setup.py' - - 'lithops/**' - pull_request: branches: - master @@ -20,7 +13,7 @@ on: jobs: - python_linting: + flake8: runs-on: ubuntu-latest steps: diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8bf0571e1..f5d785fd6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -1,13 +1,6 @@ name: Tests on: - push: - branches: - - master - paths: - - 'setup.py' - - 'lithops/**' - pull_request: branches: - master diff --git a/lithops/config.py b/lithops/config.py index 9d50147e7..0668d3fd5 100644 --- a/lithops/config.py +++ b/lithops/config.py @@ -201,6 +201,9 @@ def default_config(config_file=None, config_data=None, config_overwrite={}, load if 'monitoring' not in config_data['lithops']: config_data['lithops']['monitoring'] = c.MONITORING_DEFAULT + if 'monitoring_interval' not in config_data['lithops']: + config_data['lithops']['monitoring_interval'] = c.MONITORING_INTERVAL + if 'execution_timeout' not in config_data['lithops']: config_data['lithops']['execution_timeout'] = c.EXECUTION_TIMEOUT_DEFAULT @@ -240,6 +243,7 @@ def default_storage_config(config_file=None, config_data=None, backend=None): def extract_storage_config(config): s_config = {} + s_config['monitoring_interval'] = config['lithops']['monitoring_interval'] backend = config['lithops']['storage'] s_config['backend'] = backend s_config[backend] = config[backend] if backend in config and config[backend] else {} diff --git a/lithops/constants.py b/lithops/constants.py index 55606ee44..e6d0cacb8 100644 --- a/lithops/constants.py +++ b/lithops/constants.py @@ -37,7 +37,6 @@ MONITORING_DEFAULT = 'storage' MONITORING_INTERVAL = 2 # seconds -MONITORING_INTERVAL_LH = 0.1 # seconds SERVERLESS_BACKEND_DEFAULT = 'aws_lambda' STANDALONE_BACKEND_DEFAULT = 'aws_ec2' diff --git a/lithops/localhost/v2/localhost.py b/lithops/localhost/v2/localhost.py index ad21a0bb4..a8cf2f3a3 100644 --- a/lithops/localhost/v2/localhost.py +++ b/lithops/localhost/v2/localhost.py @@ -249,6 +249,7 @@ def queue_consumer(work_queue): break process_task(task_payload_str) + logger.debug("Starting Localhost work queue consumer threads") for _ in range(self.worker_processes): t = threading.Thread( target=queue_consumer, @@ -261,6 +262,7 @@ def stop(self, job_keys=None): """ Stops running consumer threads """ + logger.debug("Stopping Localhost work queue consumer threads") for _ in range(self.worker_processes): self.work_queue.put(None) @@ -311,12 +313,13 @@ def run_task(self, job_key, call_id): job_key_call_id = f'{job_key}-{call_id}' task_filename = os.path.join(JOBS_DIR, job_key, call_id + '.task') - logger.debug(f"Going to execute task {job_key_call_id}") + logger.debug(f"Going to execute task process {job_key_call_id}") cmd = [self.runtime_name, RUNNER_FILE, 'run_job', task_filename] process = sp.Popen(cmd, start_new_session=True) self.task_processes[job_key_call_id] = process process.communicate() # blocks until the process finishes del self.task_processes[job_key_call_id] + logger.debug(f"Task process {job_key_call_id} finished") def stop(self, job_keys=None): """ @@ -419,6 +422,7 @@ def run_task(self, job_key, call_id): docker_job_dir = f'/tmp/{USER_TEMP_DIR}/jobs/{job_key}' docker_task_filename = f'{docker_job_dir}/{call_id}.task' + logger.debug(f"Going to execute task process {job_key_call_id}") cmd = f'{self.docker_path} exec {self.container_name} /bin/bash -c ' cmd += f'"python3 /tmp/{USER_TEMP_DIR}/localhost-runner.py ' cmd += f'run_job {docker_task_filename}"' @@ -427,6 +431,7 @@ def run_task(self, job_key, call_id): self.task_processes[job_key_call_id] = process process.communicate() # blocks until the process finishes del self.task_processes[job_key_call_id] + logger.debug(f"Task process {job_key_call_id} finished") def stop(self, job_keys=None): """ diff --git a/lithops/monitor.py b/lithops/monitor.py index e04e7c32c..c06959662 100644 --- a/lithops/monitor.py +++ b/lithops/monitor.py @@ -25,10 +25,6 @@ import threading import concurrent.futures as cf from tblib import pickling_support -from lithops.constants import ( - MONITORING_INTERVAL, - MONITORING_INTERVAL_LH -) pickling_support.install() @@ -466,31 +462,26 @@ class JobMonitor: def __init__(self, executor_id, internal_storage, config=None): self.executor_id = executor_id self.internal_storage = internal_storage + self.storage_config = internal_storage.get_storage_config() + self.storage_backend = internal_storage.backend self.config = config - self.backend_type = self.config['lithops']['monitoring'].lower() if config else 'storage' - self.storage_backend = self.internal_storage.backend + self.type = self.config['lithops']['monitoring'].lower() if config else 'storage' + self.token_bucket_q = queue.Queue() self.monitor = None self.job_chunksize = {} self.MonitorClass = getattr( lithops.monitor, - f'{self.backend_type.capitalize()}Monitor' + f'{self.type.capitalize()}Monitor' ) def start(self, fs, job_id=None, chunksize=None, generate_tokens=False): - if self.backend_type == 'storage': - monitoring_interval = None - if self.config and 'lithops' in self.config: - monitoring_interval = self.config['lithops'].get('monitoring_interval') - if not monitoring_interval: - if self.storage_backend == 'localhost': - monitoring_interval = MONITORING_INTERVAL_LH - else: - monitoring_interval = MONITORING_INTERVAL - bk_config = {'monitoring_interval': monitoring_interval} + if self.type == 'storage': + monitoring_interval = self.storage_config['monitoring_interval'] + monitor_config = {'monitoring_interval': monitoring_interval} else: - bk_config = self.config.get(self.backend_type) + monitor_config = self.config.get(self.type) if job_id: self.job_chunksize[job_id] = chunksize @@ -502,7 +493,7 @@ def start(self, fs, job_id=None, chunksize=None, generate_tokens=False): token_bucket_q=self.token_bucket_q, job_chunksize=self.job_chunksize, generate_tokens=generate_tokens, - config=bk_config + config=monitor_config ) self.monitor.add_futures(fs) diff --git a/lithops/storage/backends/localhost/config.py b/lithops/storage/backends/localhost/config.py index 07bacb77d..e8c447530 100644 --- a/lithops/storage/backends/localhost/config.py +++ b/lithops/storage/backends/localhost/config.py @@ -19,4 +19,5 @@ def load_config(config_data): if 'localhost' not in config_data or config_data['localhost'] is None: config_data['localhost'] = {} + config_data['lithops']['monitoring_interval'] = 0.1 config_data['localhost']['storage_bucket'] = 'storage' diff --git a/lithops/wait.py b/lithops/wait.py index d63885f26..61da721c5 100644 --- a/lithops/wait.py +++ b/lithops/wait.py @@ -131,7 +131,7 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]], internal_storage=executor_data.internal_storage) job_monitor.start(fs=executor_data.futures) - sleep_sec = wait_dur_sec or WAIT_DUR_SEC if job_monitor.backend_type == 'storage' \ + sleep_sec = wait_dur_sec or WAIT_DUR_SEC if job_monitor.type == 'storage' \ and job_monitor.storage_backend != 'localhost' else 0.1 if return_when == ALWAYS: