Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Localhost] Set monitoring_interval to 0.1 in the localhost storage backend #1350

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions .github/workflows/python-linting.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
name: Python Linting

on:
push:
branches:
- master
paths:
- 'setup.py'
- 'lithops/**'

pull_request:
branches:
- master
Expand All @@ -20,7 +13,7 @@ on:

jobs:

python_linting:
flake8:
runs-on: ubuntu-latest

steps:
Expand Down
7 changes: 0 additions & 7 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
name: Tests

on:
push:
branches:
- master
paths:
- 'setup.py'
- 'lithops/**'

pull_request:
branches:
- master
Expand Down
4 changes: 4 additions & 0 deletions lithops/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ def default_config(config_file=None, config_data=None, config_overwrite={}, load
if 'monitoring' not in config_data['lithops']:
config_data['lithops']['monitoring'] = c.MONITORING_DEFAULT

if 'monitoring_interval' not in config_data['lithops']:
config_data['lithops']['monitoring_interval'] = c.MONITORING_INTERVAL

if 'execution_timeout' not in config_data['lithops']:
config_data['lithops']['execution_timeout'] = c.EXECUTION_TIMEOUT_DEFAULT

Expand Down Expand Up @@ -240,6 +243,7 @@ def default_storage_config(config_file=None, config_data=None, backend=None):

def extract_storage_config(config):
s_config = {}
s_config['monitoring_interval'] = config['lithops']['monitoring_interval']
backend = config['lithops']['storage']
s_config['backend'] = backend
s_config[backend] = config[backend] if backend in config and config[backend] else {}
Expand Down
1 change: 0 additions & 1 deletion lithops/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

MONITORING_DEFAULT = 'storage'
MONITORING_INTERVAL = 2 # seconds
MONITORING_INTERVAL_LH = 0.1 # seconds

SERVERLESS_BACKEND_DEFAULT = 'aws_lambda'
STANDALONE_BACKEND_DEFAULT = 'aws_ec2'
Expand Down
7 changes: 6 additions & 1 deletion lithops/localhost/v2/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def queue_consumer(work_queue):
break
process_task(task_payload_str)

logger.debug("Starting Localhost work queue consumer threads")
for _ in range(self.worker_processes):
t = threading.Thread(
target=queue_consumer,
Expand All @@ -261,6 +262,7 @@ def stop(self, job_keys=None):
"""
Stops running consumer threads
"""
logger.debug("Stopping Localhost work queue consumer threads")
for _ in range(self.worker_processes):
self.work_queue.put(None)

Expand Down Expand Up @@ -311,12 +313,13 @@ def run_task(self, job_key, call_id):
job_key_call_id = f'{job_key}-{call_id}'
task_filename = os.path.join(JOBS_DIR, job_key, call_id + '.task')

logger.debug(f"Going to execute task {job_key_call_id}")
logger.debug(f"Going to execute task process {job_key_call_id}")
cmd = [self.runtime_name, RUNNER_FILE, 'run_job', task_filename]
process = sp.Popen(cmd, start_new_session=True)
self.task_processes[job_key_call_id] = process
process.communicate() # blocks until the process finishes
del self.task_processes[job_key_call_id]
logger.debug(f"Task process {job_key_call_id} finished")

def stop(self, job_keys=None):
"""
Expand Down Expand Up @@ -419,6 +422,7 @@ def run_task(self, job_key, call_id):
docker_job_dir = f'/tmp/{USER_TEMP_DIR}/jobs/{job_key}'
docker_task_filename = f'{docker_job_dir}/{call_id}.task'

logger.debug(f"Going to execute task process {job_key_call_id}")
cmd = f'{self.docker_path} exec {self.container_name} /bin/bash -c '
cmd += f'"python3 /tmp/{USER_TEMP_DIR}/localhost-runner.py '
cmd += f'run_job {docker_task_filename}"'
Expand All @@ -427,6 +431,7 @@ def run_task(self, job_key, call_id):
self.task_processes[job_key_call_id] = process
process.communicate() # blocks until the process finishes
del self.task_processes[job_key_call_id]
logger.debug(f"Task process {job_key_call_id} finished")

def stop(self, job_keys=None):
"""
Expand Down
29 changes: 10 additions & 19 deletions lithops/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
import threading
import concurrent.futures as cf
from tblib import pickling_support
from lithops.constants import (
MONITORING_INTERVAL,
MONITORING_INTERVAL_LH
)

pickling_support.install()

Expand Down Expand Up @@ -466,31 +462,26 @@ class JobMonitor:
def __init__(self, executor_id, internal_storage, config=None):
self.executor_id = executor_id
self.internal_storage = internal_storage
self.storage_config = internal_storage.get_storage_config()
self.storage_backend = internal_storage.backend
self.config = config
self.backend_type = self.config['lithops']['monitoring'].lower() if config else 'storage'
self.storage_backend = self.internal_storage.backend
self.type = self.config['lithops']['monitoring'].lower() if config else 'storage'

self.token_bucket_q = queue.Queue()
self.monitor = None
self.job_chunksize = {}

self.MonitorClass = getattr(
lithops.monitor,
f'{self.backend_type.capitalize()}Monitor'
f'{self.type.capitalize()}Monitor'
)

def start(self, fs, job_id=None, chunksize=None, generate_tokens=False):
if self.backend_type == 'storage':
monitoring_interval = None
if self.config and 'lithops' in self.config:
monitoring_interval = self.config['lithops'].get('monitoring_interval')
if not monitoring_interval:
if self.storage_backend == 'localhost':
monitoring_interval = MONITORING_INTERVAL_LH
else:
monitoring_interval = MONITORING_INTERVAL
bk_config = {'monitoring_interval': monitoring_interval}
if self.type == 'storage':
monitoring_interval = self.storage_config['monitoring_interval']
monitor_config = {'monitoring_interval': monitoring_interval}
else:
bk_config = self.config.get(self.backend_type)
monitor_config = self.config.get(self.type)

if job_id:
self.job_chunksize[job_id] = chunksize
Expand All @@ -502,7 +493,7 @@ def start(self, fs, job_id=None, chunksize=None, generate_tokens=False):
token_bucket_q=self.token_bucket_q,
job_chunksize=self.job_chunksize,
generate_tokens=generate_tokens,
config=bk_config
config=monitor_config
)

self.monitor.add_futures(fs)
Expand Down
1 change: 1 addition & 0 deletions lithops/storage/backends/localhost/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ def load_config(config_data):
if 'localhost' not in config_data or config_data['localhost'] is None:
config_data['localhost'] = {}

config_data['lithops']['monitoring_interval'] = 0.1
config_data['localhost']['storage_bucket'] = 'storage'
2 changes: 1 addition & 1 deletion lithops/wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]],
internal_storage=executor_data.internal_storage)
job_monitor.start(fs=executor_data.futures)

sleep_sec = wait_dur_sec or WAIT_DUR_SEC if job_monitor.backend_type == 'storage' \
sleep_sec = wait_dur_sec or WAIT_DUR_SEC if job_monitor.type == 'storage' \
and job_monitor.storage_backend != 'localhost' else 0.1

if return_when == ALWAYS:
Expand Down