Skip to content

Commit

Permalink
Merge pull request #1287 from JosepSampe/lithops-dev
Browse files Browse the repository at this point in the history
runtime docs update
  • Loading branch information
JosepSampe authored Mar 26, 2024
2 parents af51f09 + c51e5b2 commit 309e7f2
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 161 deletions.
2 changes: 2 additions & 0 deletions docs/source/compute_config/gcp_cloudrun.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ python3 -m install lithops[gcp]

9. Enable the **Cloud Run API** : Navigate to *APIs & services* tab on the menu. Click *ENABLE APIS AND SERVICES*. Look for "Cloud Run API" at the search bar. Click *Enable*.

10. Enable the **Artifact Registry API**: Navigate to *APIs & services* tab on the menu. Click *ENABLE APIS AND SERVICES*. Look for "Artifact Registry API" at the search bar. Click *Enable*.

## Configuration

1. Edit your lithops config and add the following keys:
Expand Down
2 changes: 2 additions & 0 deletions docs/source/compute_config/gcp_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ python3 -m install lithops[gcp]

9. Enable the **Cloud Functions API** : Navigate to *APIs & services* tab on the menu. Click *ENABLE APIS AND SERVICES*. Look for "Cloud Functions API" at the search bar. Click *Enable*.

10. Enable the **Artifact Registry API**: Navigate to *APIs & services* tab on the menu. Click *ENABLE APIS AND SERVICES*. Look for "Artifact Registry API" at the search bar. Click *Enable*.

## Configuration

1. Edit your lithops config and add the following keys:
Expand Down
1 change: 0 additions & 1 deletion lithops/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
'use_gpu': False,
'start_timeout': 300,
'auto_dismantle': True,
'master_as_worker': False,
'soft_dismantle_timeout': 300,
'hard_dismantle_timeout': 3600
}
Expand Down
26 changes: 8 additions & 18 deletions lithops/serverless/backends/gcp_functions/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,33 +53,23 @@
}

REQUIREMENTS_FILE = """
numpy
scipy
scikit-learn
pandas
google-cloud
google-cloud-storage
google-cloud-pubsub
google-auth
google-api-python-client
certifi
chardet
docutils
httplib2
idna
jmespath
kafka-python
lxml
pika
redis
requests
numpy
six
urllib3
virtualenv
PyYAML
requests
redis
pika
scikit-learn
diskcache
cloudpickle
ps-mem
tblib
PyYAML
urllib3
psutil
"""

Expand Down
106 changes: 81 additions & 25 deletions lithops/standalone/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def get_worker(worker):
ttd = get_worker_ttd(private_ip)
ttd = ttd if ttd in ["Unknown", "Disabled"] else ttd + "s"
timestamp = float(worker_data['created'])
created = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S UTC')
created = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S UTC')
instance_type = worker_data['instance_type']
worker_processes = str(worker_data['worker_processes'])
exec_mode = worker_data['exec_mode']
Expand Down Expand Up @@ -204,17 +204,11 @@ def check_worker(worker_data):
return response


def setup_worker(standalone_handler, worker_info, work_queue_name):
def save_worker(worker, standalone_config, work_queue_name):
"""
Run worker setup process and Installs all the Lithops
dependencies into the worker
Saves the worker instance with the provided data in redis
"""
worker = standalone_handler.backend.get_instance(**worker_info, public=False)

if redis_client.hget(f"worker:{worker.name}", 'status') == WorkerStatus.ACTIVE.value:
return

config = copy.deepcopy(standalone_handler.config)
config = copy.deepcopy(standalone_config)
del config[config['backend']]
config = {key: str(value) if isinstance(value, bool) else value for key, value in config.items()}

Expand All @@ -225,24 +219,36 @@ def setup_worker(standalone_handler, worker_info, work_queue_name):

redis_client.hset(f"worker:{worker.name}", mapping={
'name': worker.name,
'status': JobStatus.SUBMITTED.value,
'status': WorkerStatus.STARTING.value,
'private_ip': worker.private_ip or '',
'instance_id': worker.instance_id,
'instance_type': instance_type,
'worker_processes': worker_processes,
'created': str(time.time()),
'ssh_credentials': json.dumps(worker.ssh_credentials),
'queue_name': work_queue_name,
'err': "", **config,
})


def setup_worker_create_reuse(standalone_handler, worker_info, work_queue_name):
"""
Run the worker setup process and installs all the Lithops dependencies into it
"""
worker = standalone_handler.backend.get_instance(**worker_info, public=False)

if redis_client.hget(f"worker:{worker.name}", 'status') == WorkerStatus.ACTIVE.value:
return

save_worker(worker, standalone_handler.config, work_queue_name)

max_instance_create_retries = worker.config.get('worker_create_retries', MAX_INSTANCE_CREATE_RETRIES)

def wait_worker_ready(worker):
instance_ready_retries = 1

while instance_ready_retries <= max_instance_create_retries:
try:
redis_client.hset(f"worker:{worker.name}", 'status', WorkerStatus.STARTING.value)
worker.wait_ready()
break
except TimeoutError as e: # VM not started in time
Expand Down Expand Up @@ -317,6 +323,46 @@ def wait_worker_ready(worker):
raise e


def setup_worker_consume(standalone_handler, worker_info, work_queue_name):
"""
Run the worker setup process in the case of Consume mode
"""
instance = standalone_handler.backend.get_instance(**worker_info, public=False)
instance.private_ip = master_ip

if redis_client.hget(f"worker:{instance.name}", 'status') == WorkerStatus.ACTIVE.value:
return

save_worker(instance, standalone_handler.config, work_queue_name)

try:
logger.debug(f'Setting up the worker in the current {instance}')
vm_data = {
'name': instance.name,
'private_ip': instance.private_ip,
'instance_id': instance.instance_id,
'ssh_credentials': instance.ssh_credentials,
'instance_type': instance.instance_type,
'master_ip': master_ip,
'work_queue_name': work_queue_name,
'lithops_version': __version__
}
worker_setup_script = "/tmp/install_lithops.sh"
script = get_worker_setup_script(standalone_handler.config, vm_data)

with open(worker_setup_script, 'w') as wis:
wis.write(script)

redis_client.hset(f"worker:{instance.name}", 'status', WorkerStatus.INSTALLING.value)
os.chmod(worker_setup_script, 0o755)
os.system("sudo " + worker_setup_script)

except Exception as e:
redis_client.hset(f"worker:{instance.name}", 'status', WorkerStatus.ERROR.value)
instance.err = f'Unable to setup lithops in the VM: {str(e)}'
raise e


def handle_workers(job_payload, workers, work_queue_name):
"""
Creates the workers (if any)
Expand All @@ -332,22 +378,33 @@ def handle_workers(job_payload, workers, work_queue_name):
futures = []
total_correct = 0

with ThreadPoolExecutor(len(workers)) as executor:
for worker_info in workers:
future = executor.submit(
setup_worker,
if standalone_config['exec_mode'] == StandaloneMode.CONSUME.value:
try:
setup_worker_consume(
standalone_handler,
worker_info,
workers[0],
work_queue_name
)
futures.append(future)

for future in cf.as_completed(futures):
try:
future.result()
total_correct += 1
except Exception as e:
logger.error(e)
else:
with ThreadPoolExecutor(len(workers)) as executor:
for worker_info in workers:
future = executor.submit(
setup_worker_create_reuse,
standalone_handler,
worker_info,
work_queue_name
)
futures.append(future)

for future in cf.as_completed(futures):
try:
future.result()
total_correct += 1
except Exception as e:
logger.error(e)

logger.debug(
f'{total_correct} of {len(workers)} workers started '
Expand Down Expand Up @@ -426,7 +483,7 @@ def list_jobs():
timestamp = float(job_data['submitted'])
runtime = job_data['runtime_name']
worker_type = job_data['worker_type'] if exec_mode != StandaloneMode.CONSUME.value else 'VM'
submitted = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S UTC')
submitted = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S UTC')
total_tasks = str(job_data['total_tasks'])
done_tasks = str(redis_client.llen(f'tasksdone:{job_key}'))
job = (job_key, func_name, submitted, worker_type, runtime, f'{done_tasks}/{total_tasks}', status)
Expand Down Expand Up @@ -487,6 +544,7 @@ def run():

exec_mode = job_payload['config']['standalone']['exec_mode']
exec_mode = StandaloneMode[exec_mode.upper()]
workers = job_payload.pop('worker_instances')

if exec_mode == StandaloneMode.CONSUME:
queue_name = f'wq:localhost:{runtime_name.replace("/", "-")}'
Expand All @@ -497,8 +555,6 @@ def run():
worker_wp = job_payload['worker_processes']
queue_name = f'wq:{worker_it}-{worker_wp}-{runtime_name.replace("/", "-")}'

workers = job_payload.pop('worker_instances')

Thread(target=handle_job, args=(job_payload, queue_name)).start()
Thread(target=handle_workers, args=(job_payload, workers, queue_name)).start()

Expand Down
1 change: 0 additions & 1 deletion lithops/standalone/standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def __init__(self, standalone_config):
self.backend_name = self.config['backend']
self.start_timeout = self.config['start_timeout']
self.exec_mode = StandaloneMode[self.config['exec_mode'].upper()]
self.use_master_as_worker = self.config.get('master_as_worker', False)
self.is_lithops_worker = is_lithops_worker()

module_location = f'lithops.standalone.backends.{self.backend_name}'
Expand Down
12 changes: 7 additions & 5 deletions lithops/standalone/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ class LithopsValidationError(Exception):
StartLimitIntervalSec=5
[Service]
ExecStart={0}
ExecStop={1}
ExecStartPre={0}
ExecStart={1}
ExecStop={2}
Restart=on-failure
[Install]
Expand Down Expand Up @@ -214,17 +215,18 @@ def get_worker_setup_script(config, vm_data):
this script is expected to be executed only from Master VM
"""
if config['runtime'].startswith(('python', '/')):
cmd_pre = cmd_stop = "id"
cmd_start = f"/usr/bin/python3 {SA_INSTALL_DIR}/worker.py"
cmd_stop = "id"
else:
cmd_pre = '-docker rm -f lithops_worker'
cmd_start = 'docker run --rm --name lithops_worker '
cmd_start += '--gpus all ' if config["use_gpu"] else ''
cmd_start += f'--user {os.getuid()}:{os.getgid()} '
cmd_start += f'--env USER={os.getenv("USER", "root")} --env DOCKER=Lithops '
cmd_start += f'-p {SA_WORKER_SERVICE_PORT}:{SA_WORKER_SERVICE_PORT} '
cmd_start += f'-v {SA_INSTALL_DIR}:{SA_INSTALL_DIR} -v /tmp:/tmp '
cmd_start += f'--entrypoint "python3" {config["runtime"]} {SA_INSTALL_DIR}/worker.py'
cmd_stop = "docker rm -f lithops_worker"
cmd_stop = '-docker rm -f lithops_worker'

script = docker_login(config)
script += f"""
Expand All @@ -237,7 +239,7 @@ def get_worker_setup_script(config, vm_data):
setup_host >> {SA_SETUP_LOG_FILE} 2>&1;
USER_HOME=$(eval echo ~${{SUDO_USER}});
setup_service(){{
echo '{WORKER_SERVICE_FILE.format(cmd_start, cmd_stop)}' > /etc/systemd/system/{WORKER_SERVICE_NAME};
echo '{WORKER_SERVICE_FILE.format(cmd_pre, cmd_start, cmd_stop)}' > /etc/systemd/system/{WORKER_SERVICE_NAME};
chmod 644 /etc/systemd/system/{WORKER_SERVICE_NAME};
systemctl daemon-reload;
systemctl stop {WORKER_SERVICE_NAME};
Expand Down
27 changes: 19 additions & 8 deletions lithops/standalone/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import os
import time
import json
import redis
import flask
Expand Down Expand Up @@ -108,7 +109,7 @@ def notify_worker_active(worker_name):

def notify_worker_idle(worker_name):
try:
data = {'status': WorkerStatus.IDLE.value, 'runtime': ''}
data = {'status': WorkerStatus.IDLE.value, 'runtime': '', 'worker_processes': ''}
redis_client.hset(f"worker:{worker_name}", mapping=data)
except Exception as e:
logger.error(e)
Expand Down Expand Up @@ -153,17 +154,25 @@ def redis_queue_consumer(pid, work_queue_name, exec_mode, backend):
logger.info(f"Redis consumer process {pid} started")

while True:
if exec_mode in StandaloneMode.REUSE.value:
if exec_mode == StandaloneMode.CONSUME.value:
task_payload_str = redis_client.rpop(work_queue_name)
if not task_payload_str:
time.sleep(1)
if all(worker['status'] == WorkerStatus.IDLE.value
for worker in worker_threads.values()):
break
continue
elif exec_mode == StandaloneMode.REUSE.value:
key, task_payload_str = redis_client.brpop(work_queue_name)
else:
task_payload_str = redis_client.rpop(work_queue_name)
if task_payload_str is None:
return

task_payload = json.loads(task_payload_str)
break

worker_threads[pid]['status'] = WorkerStatus.BUSY.value

task_payload = json.loads(task_payload_str)

executor_id = task_payload['executor_id']
job_id = task_payload['job_id']
job_key = task_payload['job_key']
Expand All @@ -172,7 +181,7 @@ def redis_queue_consumer(pid, work_queue_name, exec_mode, backend):

try:
logger.debug(f'ExecutorID {executor_id} | JobID {job_id} - Running '
f'CallID {call_id} in the local worker')
f'CallID {call_id} in the local worker (consumer {pid})')
notify_task_start(job_key, call_id)

if budget_keeper:
Expand Down Expand Up @@ -207,6 +216,8 @@ def redis_queue_consumer(pid, work_queue_name, exec_mode, backend):

worker_threads[pid]['status'] = WorkerStatus.IDLE.value

logger.info(f"Redis consumer process {pid} finished")


def run_worker():
global redis_client
Expand Down Expand Up @@ -273,9 +284,9 @@ def run_wsgi():
if standalone_config['exec_mode'] == StandaloneMode.CONSUME.value:
notify_worker_idle(worker_data['name'])

# run_worker will run forever in reuse mode. In create mode it will
# run_worker will run forever in reuse mode. In create and consume mode it will
# run until there are no more tasks in the queue.
logger.debug('Worker finished')
logger.debug('Worker service finished')

try:
# Try to stop the current worker VM once no more pending tasks to run
Expand Down
4 changes: 1 addition & 3 deletions runtime/aws_ec2/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
# Lithops runtime for AWS EC2

In AWS EC2, you can run functions by using a Virtual machine (VM). In the VM, functions run using parallel processes. In this case, it is not needed to install anything in the remote VMs since Lithops does this process automatically the first time you use them. However, use a custom VM it is a preferable approach, since using a pre-built custom image will greatly improve the overall execution time. To benefit from this approach, follow the following steps:
In AWS EC2, you can execute functions using a Virtual Machine (VM). These functions run as parallel processes within the VM. When using Lithops for the first time, there's no need to manually install anything on the remote VMs, as Lithops handles this process automatically. However, utilizing a custom VM is preferable, as employing a pre-built custom image significantly improves overall execution time. To benefit from this approach, follow these steps:

## Option 1:

**Note**: This is a beta feature. Please open an issue if you encounter any errors using this way of creating VM images in AWS EC2.

For building the default VM image that contains all dependencies required by Lithops, execute:

```
Expand Down
Loading

0 comments on commit 309e7f2

Please sign in to comment.