Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLI] Show worker creation timestamp and ttd in lithops worker list #1275

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions lithops/scripts/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ def list_jobs(config, backend, region, debug):
job_list = compute_handler.list_jobs()

headers = job_list.pop(0)
key_index = headers.index("Submitted")

try:
import pytz
Expand All @@ -711,11 +712,11 @@ def convert_utc_to_local(utc_timestamp):
return local_time.strftime('%Y-%m-%d %H:%M:%S %Z')

for row in job_list:
row[2] = convert_utc_to_local(row[2])
row[key_index] = convert_utc_to_local(row[key_index])
except ModuleNotFoundError:
pass

sorted_data = sorted(job_list, key=lambda x: x[2])
sorted_data = sorted(job_list, key=lambda x: x[key_index])

print()
print(tabulate(sorted_data, headers=headers))
Expand Down Expand Up @@ -773,8 +774,28 @@ def list_workers(config, backend, region, debug):
worker_list = compute_handler.list_workers()

headers = worker_list.pop(0)
key_index = headers.index("Created")

try:
import pytz
from tzlocal import get_localzone
local_tz = get_localzone()

def convert_utc_to_local(utc_timestamp):
utc_time = datetime.strptime(utc_timestamp, '%Y-%m-%d %H:%M:%S %Z')
utc_time = utc_time.replace(tzinfo=pytz.utc)
local_time = utc_time.astimezone(local_tz)
return local_time.strftime('%Y-%m-%d %H:%M:%S %Z')

for row in worker_list:
row[key_index] = convert_utc_to_local(row[key_index])
except ModuleNotFoundError:
pass

sorted_data = sorted(worker_list, key=lambda x: x[key_index])

print()
print(tabulate(worker_list, headers=headers))
print(tabulate(sorted_data, headers=headers))
print(f'\nTotal workers: {len(worker_list)}')


Expand Down
14 changes: 9 additions & 5 deletions lithops/standalone/keeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, config, instance_data, stop_callback=None, delete_callback=No

self.runing = False
self.jobs = {}
self.time_to_dismantle = self.hard_dismantle_timeout

self.standalone_handler = StandaloneHandler(self.standalone_config)
self.instance = self.standalone_handler.backend.get_instance(**instance_data)
Expand All @@ -53,6 +54,9 @@ def __init__(self, config, instance_data, stop_callback=None, delete_callback=No
f"instance ID: {self.instance.instance_id}")
logger.debug(f"Delete {self.instance.name} on dismantle: {self.instance.delete_on_dismantle}")

def get_time_to_dismantle(self):
return self.time_to_dismantle

def add_job(self, job_key):
self.last_usage_time = time.time()
self.jobs[job_key] = JobStatus.RUNNING.value
Expand Down Expand Up @@ -91,14 +95,14 @@ def run(self):

time_since_last_usage = time.time() - self.last_usage_time

time_to_dismantle = int(self.soft_dismantle_timeout - time_since_last_usage)
self.time_to_dismantle = int(self.soft_dismantle_timeout - time_since_last_usage)
else:
time_to_dismantle = int(self.hard_dismantle_timeout - time_since_last_usage)
self.time_to_dismantle = int(self.hard_dismantle_timeout - time_since_last_usage)
jobs_running = True

if time_to_dismantle > 0:
logger.debug(f"Time to dismantle: {time_to_dismantle} seconds")
check_interval = min(60, max(time_to_dismantle / 10, 1))
if self.time_to_dismantle > 0:
logger.debug(f"Time to dismantle: {self.time_to_dismantle} seconds")
check_interval = min(60, max(self.time_to_dismantle / 10, 1))
time.sleep(check_interval)
else:
self.stop_instance()
Expand Down
35 changes: 29 additions & 6 deletions lithops/standalone/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ def is_worker_free(worker_private_ip):
return False


def get_worker_ttd(worker_private_ip):
"""
Checks if the Lithops service is ready and free in the worker VM instance
"""
url = f"http://{worker_private_ip}:{SA_WORKER_SERVICE_PORT}/ttd"
try:
r = requests.get(url, timeout=0.5)
logger.debug(f'Worker TTD from {worker_private_ip}: {r.text}')
return r.text
except Exception:
return "Unknown"


@app.route('/worker/list', methods=['GET'])
def list_workers():
"""
Expand All @@ -103,17 +116,25 @@ def list_workers():

budget_keeper.last_usage_time = time.time()

result = [['Worker Name', 'Instance Type', 'Processes', 'Runtime', 'Execution Mode', 'Status']]
result = [['Worker Name', 'Created', 'Instance Type', 'Processes', 'Runtime', 'Execution Mode', 'Status', 'TTD']]

for worker in redis_client.keys('worker:*'):
def get_worker(worker):
worker_data = redis_client.hgetall(worker)
name = worker_data['name']
status = worker_data['status']
private_ip = worker_data['private_ip']
ttd = get_worker_ttd(private_ip) + "s"
timestamp = float(worker_data['created'])
created = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S UTC')
instance_type = worker_data['instance_type']
worker_processes = str(worker_data['worker_processes'])
exec_mode = worker_data['exec_mode']
runtime = worker_data['runtime']
result.append((name, instance_type, worker_processes, runtime, exec_mode, status))
result.append((name, created, instance_type, worker_processes, runtime, exec_mode, status, ttd))

workers = redis_client.keys('worker:*')
with ThreadPoolExecutor(len(workers)) as ex:
ex.map(get_worker, workers)

logger.debug(f"workers: {result}")
return flask.jsonify(result)
Expand Down Expand Up @@ -197,6 +218,7 @@ def setup_worker(standalone_handler, worker_info, work_queue_name):
'instance_id': worker.instance_id,
'instance_type': instance_type,
'worker_processes': worker_processes,
'created': str(time.time()),
'ssh_credentials': json.dumps(worker.ssh_credentials),
'err': "", **config,
})
Expand Down Expand Up @@ -325,7 +347,7 @@ def handle_workers(job_payload, workers, work_queue_name):
# Jobs
# /---------------------------------------------------------------------------/

def kill_job_process(job_key_list):
def cancel_job_process(job_key_list):
"""
Cleans the work queues and sends the SIGTERM to the workers
"""
Expand Down Expand Up @@ -355,7 +377,8 @@ def stop_task(worker):
ex.map(stop_task, workers)

Path(os.path.join(JOBS_DIR, job_key + '.done')).touch()
redis_client.hset(f"job:{job_key}", 'status', JobStatus.KILLED.value)
if redis_client.hget(f"job:{job_key}", 'status') != JobStatus.DONE.value:
redis_client.hset(f"job:{job_key}", 'status', JobStatus.CANCELED.value)


@app.route('/job/stop', methods=['POST'])
Expand All @@ -366,7 +389,7 @@ def stop():
job_key_list = flask.request.get_json(force=True, silent=True)
# Start a separate thread to do the task in background,
# for not keeping the client waiting.
Thread(target=kill_job_process, args=(job_key_list, )).start()
Thread(target=cancel_job_process, args=(job_key_list, )).start()

return ('', 204)

Expand Down
6 changes: 4 additions & 2 deletions lithops/standalone/standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import uuid
import json
import time
import hashlib
import logging
import importlib
import requests
Expand Down Expand Up @@ -234,8 +235,9 @@ def create_workers(workers_to_create):
futures = []
with cf.ThreadPoolExecutor(min(workers_to_create, 48)) as ex:
for vm_n in range(workers_to_create):
worker_id = "{:04d}".format(vm_n)
name = f'lithops-worker-{executor_id}-{job_id}-{worker_id}'
worker_id = f"{executor_id}-{job_id}-{vm_n}"
worker_hash = hashlib.sha1(worker_id.encode("utf-8")).hexdigest()[:8]
name = f'lithops-worker-{worker_hash}'
futures.append(ex.submit(self.backend.create_worker, name))

for future in cf.as_completed(futures):
Expand Down
2 changes: 1 addition & 1 deletion lithops/standalone/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
DONE = 'done'
KILLED = 'killed'
CANCELED = 'canceled'


class LithopsValidationError(Exception):
Expand Down
6 changes: 6 additions & 0 deletions lithops/standalone/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def ping():
return response


@app.route('/ttd', methods=['GET'])
def ttd():
ttd = budget_keeper.get_time_to_dismantle()
return str(ttd), 200


@app.route('/stop/<job_key>', methods=['POST'])
def stop(job_key):
logger.debug(f'Received SIGTERM: Stopping job process {job_key}')
Expand Down
Loading