Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process prepare-computation messages #5

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions amqp_messager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

class AMQPMessager(MessagingHandler):
def __init__(self, server, receiver_queues, sender_queue, task_queue,
result_queue):
result_queue, preparation_queue):
super(AMQPMessager, self).__init__()
self.server = server
self.receiver_queues = receiver_queues
self.sender_queue = sender_queue
self.tasks = task_queue
self.results = result_queue
self.preparations = preparation_queue

def on_start(self, event):
conn = event.container.connect(self.server)
Expand All @@ -28,8 +29,10 @@ def on_start(self, event):

def on_message(self, event):
# ToDO: ignore duplicate message
# print(event.message.body)
self.tasks.put(event.message.body)
if event.message.address != "preparations":
self.tasks.put(event.message.body)
elif event.message.address == "preparations":
self.preparations.put(event.message.body)

def on_result(self, event):
# check if we are finished
Expand Down
68 changes: 45 additions & 23 deletions backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import re
import json
import tempfile
from xml.etree.ElementPath import prepare_star
import docker
import requests
import time
Expand All @@ -34,6 +35,7 @@ def __init__(self, config_file):
print("Using env AMQPServer %s"%os.getenv('AMQPServer'))
self.tasks = multiprocessing.Queue(3)
self.results = multiprocessing.Queue()
self.preparations = multiprocessing.Queue()
self.running_computations = {}
self.client = docker.from_env()
# ToDO: store errors and send them within result-message back
Expand All @@ -46,7 +48,8 @@ def __init__(self, config_file):
self.config.getlist("AMQP", "computationqueues"),
self.config["AMQP"]["resultqueue"],
self.tasks,
self.results))
self.results,
self.preparations))
self.messager_process = multiprocessing.Process(target=messager.run)
self.messager_process.start()

Expand Down Expand Up @@ -103,6 +106,20 @@ def main(self):
comp2trash.append(key)
for key in comp2trash:
del self.running_computations[key]

# prepare containers if available
# TODO also prepare matlab
try:
prepare_task = self.preparations.get(block=True, timeout=1)
except Empty:
pass
else:
print("--- Got preparation task. ---")
json_prepare_task = json.loads(prepare_task)
tmp_dir, files = self._prepare_all_environments(json_prepare_task)
if json_prepare_task["environment"] == "Container":
comp_conf = ConfigurationContainerSchema().load(json_prepare_task["configuration"])
self.load_image(comp_conf, tmp_dir.name)

def _prepare_all_environments(self, computation):
# create tmp-dir for this computation and store files there
Expand Down Expand Up @@ -137,7 +154,33 @@ def _prepare_container_backend(self, computation, tmp_dir):
# ToDO: create in-between status messages for frontend
comp_conf = ConfigurationContainerSchema().load(
computation["configuration"])

# load image
image_filename, image_id = self.load_image(comp_conf, tmp_dir)

# create container
if comp_conf["volume"] is not None:
print("Creating volume ...")
volume = self.client.volumes.create(labels={"computation": computation['identifier'].hex})

print("Creating container ...")
container = self.client.containers.create(
image_id,
command=comp_conf["command_line_arguments"],
auto_remove=False,
cpu_quota=100000*comp_conf["num_cpus"],
detach=True,
entrypoint=comp_conf["entrypoint"],
mem_limit=comp_conf["memory"],
mounts=[Mount(comp_conf["volume"],volume.id)] \
if comp_conf["volume"] is not None else None)
print("... Done.")
if comp_conf["volume"] is not None:
return container, image_filename, volume
else:
return container, image_filename, None

def load_image(self, comp_conf, tmp_dir):
image_filename = None
image_uri = comp_conf["image"]
if image_uri.startswith("file"):
Expand Down Expand Up @@ -175,27 +218,7 @@ def _prepare_container_backend(self, computation, tmp_dir):
with open(os.path.join(tmp_dir, image_filename), 'rb') as bf:
image_id = self.client.images.load(bf)[0].id

# create container
if comp_conf["volume"] is not None:
print("Creating volume ...")
volume = self.client.volumes.create(labels={"computation": computation['identifier'].hex})

print("Creating container ...")
container = self.client.containers.create(
image_id,
command=comp_conf["command_line_arguments"],
auto_remove=False,
cpu_quota=100000*comp_conf["num_cpus"],
detach=True,
entrypoint=comp_conf["entrypoint"],
mem_limit=comp_conf["memory"],
mounts=[Mount(comp_conf["volume"],volume.id)] \
if comp_conf["volume"] is not None else None)
print("... Done.")
if comp_conf["volume"] is not None:
return container, image_filename, volume
else:
return container, image_filename, None
return image_filename, image_id

def copy_to_container(self, ip_add, basepath, files):
print(basepath, files)
Expand All @@ -210,7 +233,6 @@ def add_mimetypes(self):
mimetypes.add_type("application/x-vgf", ".vgf")
mimetypes.add_type("application/x-vgf3", ".vgf3")
mimetypes.add_type("application/x-vgfc", ".vgfc")


class ResultStreamer(Thread):
def __init__(self, stream, tmp_dir, files, result_queue, computation_id, sidekick):
Expand Down
2 changes: 1 addition & 1 deletion config.sample.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ KeepContainer = no # yes for debugging

[AMQP]
Server = localhost:5672
ComputationQueues = ["computations", "computations/container"]
ComputationQueues = ["computations", "computations/container", "preparations"]
ResultQueue = results