Skip to content

Commit

Permalink
Merge pull request #1350 from JosepSampe/lithops-dev
Browse files Browse the repository at this point in the history
[Localhost] Set monitoring_interval to 0.1 in the localhost storage backend
  • Loading branch information
JosepSampe authored May 13, 2024
2 parents 233c744 + 8d72618 commit 182f78d
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 37 deletions.
9 changes: 1 addition & 8 deletions .github/workflows/python-linting.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
name: Python Linting

on:
push:
branches:
- master
paths:
- 'setup.py'
- 'lithops/**'

pull_request:
branches:
- master
Expand All @@ -20,7 +13,7 @@ on:

jobs:

python_linting:
flake8:
runs-on: ubuntu-latest

steps:
Expand Down
7 changes: 0 additions & 7 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
name: Tests

on:
push:
branches:
- master
paths:
- 'setup.py'
- 'lithops/**'

pull_request:
branches:
- master
Expand Down
4 changes: 4 additions & 0 deletions lithops/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ def default_config(config_file=None, config_data=None, config_overwrite={}, load
if 'monitoring' not in config_data['lithops']:
config_data['lithops']['monitoring'] = c.MONITORING_DEFAULT

if 'monitoring_interval' not in config_data['lithops']:
config_data['lithops']['monitoring_interval'] = c.MONITORING_INTERVAL

if 'execution_timeout' not in config_data['lithops']:
config_data['lithops']['execution_timeout'] = c.EXECUTION_TIMEOUT_DEFAULT

Expand Down Expand Up @@ -240,6 +243,7 @@ def default_storage_config(config_file=None, config_data=None, backend=None):

def extract_storage_config(config):
s_config = {}
s_config['monitoring_interval'] = config['lithops']['monitoring_interval']
backend = config['lithops']['storage']
s_config['backend'] = backend
s_config[backend] = config[backend] if backend in config and config[backend] else {}
Expand Down
1 change: 0 additions & 1 deletion lithops/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

MONITORING_DEFAULT = 'storage'
MONITORING_INTERVAL = 2 # seconds
MONITORING_INTERVAL_LH = 0.1 # seconds

SERVERLESS_BACKEND_DEFAULT = 'aws_lambda'
STANDALONE_BACKEND_DEFAULT = 'aws_ec2'
Expand Down
7 changes: 6 additions & 1 deletion lithops/localhost/v2/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def queue_consumer(work_queue):
break
process_task(task_payload_str)

logger.debug("Starting Localhost work queue consumer threads")
for _ in range(self.worker_processes):
t = threading.Thread(
target=queue_consumer,
Expand All @@ -261,6 +262,7 @@ def stop(self, job_keys=None):
"""
Stops running consumer threads
"""
logger.debug("Stopping Localhost work queue consumer threads")
for _ in range(self.worker_processes):
self.work_queue.put(None)

Expand Down Expand Up @@ -311,12 +313,13 @@ def run_task(self, job_key, call_id):
job_key_call_id = f'{job_key}-{call_id}'
task_filename = os.path.join(JOBS_DIR, job_key, call_id + '.task')

logger.debug(f"Going to execute task {job_key_call_id}")
logger.debug(f"Going to execute task process {job_key_call_id}")
cmd = [self.runtime_name, RUNNER_FILE, 'run_job', task_filename]
process = sp.Popen(cmd, start_new_session=True)
self.task_processes[job_key_call_id] = process
process.communicate() # blocks until the process finishes
del self.task_processes[job_key_call_id]
logger.debug(f"Task process {job_key_call_id} finished")

def stop(self, job_keys=None):
"""
Expand Down Expand Up @@ -419,6 +422,7 @@ def run_task(self, job_key, call_id):
docker_job_dir = f'/tmp/{USER_TEMP_DIR}/jobs/{job_key}'
docker_task_filename = f'{docker_job_dir}/{call_id}.task'

logger.debug(f"Going to execute task process {job_key_call_id}")
cmd = f'{self.docker_path} exec {self.container_name} /bin/bash -c '
cmd += f'"python3 /tmp/{USER_TEMP_DIR}/localhost-runner.py '
cmd += f'run_job {docker_task_filename}"'
Expand All @@ -427,6 +431,7 @@ def run_task(self, job_key, call_id):
self.task_processes[job_key_call_id] = process
process.communicate() # blocks until the process finishes
del self.task_processes[job_key_call_id]
logger.debug(f"Task process {job_key_call_id} finished")

def stop(self, job_keys=None):
"""
Expand Down
29 changes: 10 additions & 19 deletions lithops/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
import threading
import concurrent.futures as cf
from tblib import pickling_support
from lithops.constants import (
MONITORING_INTERVAL,
MONITORING_INTERVAL_LH
)

pickling_support.install()

Expand Down Expand Up @@ -466,31 +462,26 @@ class JobMonitor:
def __init__(self, executor_id, internal_storage, config=None):
self.executor_id = executor_id
self.internal_storage = internal_storage
self.storage_config = internal_storage.get_storage_config()
self.storage_backend = internal_storage.backend
self.config = config
self.backend_type = self.config['lithops']['monitoring'].lower() if config else 'storage'
self.storage_backend = self.internal_storage.backend
self.type = self.config['lithops']['monitoring'].lower() if config else 'storage'

self.token_bucket_q = queue.Queue()
self.monitor = None
self.job_chunksize = {}

self.MonitorClass = getattr(
lithops.monitor,
f'{self.backend_type.capitalize()}Monitor'
f'{self.type.capitalize()}Monitor'
)

def start(self, fs, job_id=None, chunksize=None, generate_tokens=False):
if self.backend_type == 'storage':
monitoring_interval = None
if self.config and 'lithops' in self.config:
monitoring_interval = self.config['lithops'].get('monitoring_interval')
if not monitoring_interval:
if self.storage_backend == 'localhost':
monitoring_interval = MONITORING_INTERVAL_LH
else:
monitoring_interval = MONITORING_INTERVAL
bk_config = {'monitoring_interval': monitoring_interval}
if self.type == 'storage':
monitoring_interval = self.storage_config['monitoring_interval']
monitor_config = {'monitoring_interval': monitoring_interval}
else:
bk_config = self.config.get(self.backend_type)
monitor_config = self.config.get(self.type)

if job_id:
self.job_chunksize[job_id] = chunksize
Expand All @@ -502,7 +493,7 @@ def start(self, fs, job_id=None, chunksize=None, generate_tokens=False):
token_bucket_q=self.token_bucket_q,
job_chunksize=self.job_chunksize,
generate_tokens=generate_tokens,
config=bk_config
config=monitor_config
)

self.monitor.add_futures(fs)
Expand Down
1 change: 1 addition & 0 deletions lithops/storage/backends/localhost/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ def load_config(config_data):
if 'localhost' not in config_data or config_data['localhost'] is None:
config_data['localhost'] = {}

config_data['lithops']['monitoring_interval'] = 0.1
config_data['localhost']['storage_bucket'] = 'storage'
2 changes: 1 addition & 1 deletion lithops/wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]],
internal_storage=executor_data.internal_storage)
job_monitor.start(fs=executor_data.futures)

sleep_sec = wait_dur_sec or WAIT_DUR_SEC if job_monitor.backend_type == 'storage' \
sleep_sec = wait_dur_sec or WAIT_DUR_SEC if job_monitor.type == 'storage' \
and job_monitor.storage_backend != 'localhost' else 0.1

if return_when == ALWAYS:
Expand Down

0 comments on commit 182f78d

Please sign in to comment.