Skip to content

Commit

Permalink
Enhance private cloud deployment support (#520)
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin-Molinero authored Nov 22, 2024
1 parent 1b9246e commit 013bb6c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 45 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1831,9 +1831,9 @@ Options:
--master Run in master mode
--slave Run in slave mode
--token TEXT The master server token
--master-ip TEXT The master server ip address
--master-domain TEXT The master server domain
--master-port INTEGER The master server port
--slave-ip TEXT The slave server ip address
--slave-domain TEXT The slave server domain
--update Pull the latest image before starting
--no-update Do not update to the latest version
--compute TEXT Compute configuration to use
Expand Down
87 changes: 48 additions & 39 deletions lean/commands/private_cloud/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,18 @@
from lean.constants import COMPUTE_MASTER, COMPUTE_SLAVE, COMPUTE_MESSAGING


def get_free_port():
from socket import socket
for i in range(0, 3):
try:
port = 32787 + i
with socket() as s:
s.bind(('', port))
return port
except:
pass
return 0


def deploy(ip: str, port: int, token: str, slave: bool, update: bool, no_update: bool,
def deploy(target_master_domain: str, self_domain: str, port: int, token: str, slave: bool, update: bool, no_update: bool,
image: str, lean_config: dict, extra_docker_config: str, counter: int = 0):
logger = container.logger

compute_node_name = f"{COMPUTE_SLAVE}{counter}" if slave else COMPUTE_MASTER
logger.info(f"Starting {compute_node_name}...")
compute_directory = Path(f"~/.lean/compute/{compute_node_name}").expanduser()
lean_config["node-name"] = compute_node_name

run_options = container.lean_runner.get_basic_docker_config_without_algo(lean_config, None, True, None, None,
None, compute_directory)
run_options["environment"]["AIRLOCK"] = compute_directory
run_options["mounts"].append(Mount(target="/QuantConnect/platform-services/airlock",
source=str(compute_directory), type="bind"))
run_options["mounts"].append(Mount(target="/var/run/docker.sock", source="/var/run/docker.sock",
Expand All @@ -59,17 +48,31 @@ def deploy(ip: str, port: int, token: str, slave: bool, update: bool, no_update:
type="bind", read_only=True))
container.lean_runner.parse_extra_docker_config(run_options, loads(extra_docker_config))

if not image:
image = "quantconnect/platform-services:latest"

is_domain = not self_domain.replace('.', '').isnumeric()
if not slave:
run_options["ports"]["9696"] = str(port)
run_options["ports"]["9697"] = str(get_free_port())
if not is_domain:
run_options["ports"]["9696"] = str(port)
run_options["ports"]["9697"] = str(0)

root_directory = container.lean_config_manager.get_cli_root_directory()
run_options["volumes"][str(root_directory)] = {"bind": "/LeanCLIWorkspace", "mode": "rw"}

if is_domain:
labels = {}
for name, value in container.docker_manager.get_image_labels(image):
if slave and name == "slave" or not slave and name == "master":
for key, label in loads(value).items():
labels[key] = label.replace("{{domain}}", self_domain)
run_options["labels"] = labels

run_options["remove"] = False
run_options["name"] = compute_node_name
run_options["environment"]["MODE"] = str('slave') if slave else str('master')
run_options["environment"]["IP"] = str(ip)
run_options["environment"]["MASTER_DOMAIN"] = str(target_master_domain)
run_options["environment"]["SELF_DOMAIN"] = str(self_domain)
run_options["environment"]["PORT"] = str(port)
run_options["environment"]["TOKEN"] = str(token)
run_options["user"] = "root"
Expand Down Expand Up @@ -103,9 +106,9 @@ def get_ip_address():
@option("--master", is_flag=True, default=False, help="Run in master mode")
@option("--slave", is_flag=True, default=False, help="Run in slave mode")
@option("--token", type=str, required=False, help="The master server token")
@option("--master-ip", type=str, required=False, help="The master server ip address")
@option("--master-port", type=int, required=False, default=0, help="The master server port")
@option("--slave-ip", type=str, required=False, help="The slave server ip address")
@option("--master-domain", type=str, required=False, help="The master server domain")
@option("--master-port", type=int, required=False, default=443, help="The master server port")
@option("--slave-domain", type=str, required=False, help="The slave server domain")
@option("--update", is_flag=True, default=False, help="Pull the latest image before starting")
@option("--no-update", is_flag=True, default=False, help="Do not update to the latest version")
@option("--compute", type=str, required=False, help="Compute configuration to use")
Expand All @@ -115,8 +118,8 @@ def get_ip_address():
def start(master: bool,
slave: bool,
token: str,
master_ip: str,
slave_ip: str,
master_domain: str,
slave_domain: str,
master_port: int,
update: bool,
no_update: bool,
Expand All @@ -135,9 +138,9 @@ def start(master: bool,
# just default to slave if none given
slave = True

if not master_ip:
master_ip = get_ip_address()
logger.info(f"'--master-ip' was not provided using '{master_ip}'")
if not master_domain:
master_domain = get_ip_address()
logger.info(f"'--master-domain' was not provided using '{master_domain}'")

str_mode = 'slave' if slave else 'master'
logger.info(f'Start running in {str_mode} mode')
Expand All @@ -154,9 +157,7 @@ def start(master: bool,

if slave:
if not token:
raise RuntimeError(f"Master token is required when running as slave")
if master_port == 0:
raise RuntimeError(f"Master port is required when running as slave")
raise RuntimeError(f"Master token '--token' is required when running as slave")
else:
if not token:
from uuid import uuid4
Expand All @@ -166,41 +167,49 @@ def start(master: bool,
if any(docker_container):
names = [node.name for node in docker_container if node.status == 'running']
if master and (COMPUTE_MASTER in names or COMPUTE_MESSAGING in names):
raise RuntimeError(f"Private cloud nodes already running detected: {names}")
raise RuntimeError(f"Private cloud nodes already running, please use '--stop'. Detected: {names}")
logger.info(f"Running nodes: {names}")

container.temp_manager.delete_temporary_directories_when_done = False
lean_config = container.lean_config_manager.get_complete_lean_config(None, None, None)

master_is_domain = not master_domain.replace('.', '').isnumeric()
if master:
deploy(master_ip, master_port, token, False, update, no_update, image, lean_config, extra_docker_config)
master_port_option = f" --master-port {master_port}"
if master_is_domain:
slave_domain = master_domain
master_port_option = ''
deploy(master_domain, master_domain, master_port, token, False, update, no_update, image,
lean_config, extra_docker_config)
if master_port == 0:
master_port = container.docker_manager.get_container_port(COMPUTE_MASTER, "9696/tcp")
logger.info(f"Slaves can be added running: "
f"lean private-cloud start --slave --master-ip {master_ip} --token \"{token}\" --master-port {master_port}")

logger.info(f"Slaves can be added running: lean private-cloud start --slave --master-domain {master_domain}"
f" --slave-domain {{slave.domain}} --token \"{token}\"{master_port_option}")

compute_index = len(get_private_cloud_containers([COMPUTE_SLAVE]))
if compute:
logger.debug(f"Starting given compute configuration: {compute}")

if not slave_ip:
logger.debug(f"'slave-ip' was not given will try to figure it out...")
if not slave_domain:
logger.debug(f"'slave-domain' was not given will try to figure it out...")
retry_count = 0
while retry_count < 10:
retry_count += 1
try:
from requests import get
resp = get(f'http://{master_ip}:{master_port}', stream=True)
slave_ip = resp.raw._connection.sock.getsockname()[0]
resp = get(f'http://{master_domain}:{master_port}', stream=True)
slave_domain = resp.raw._connection.sock.getsockname()[0]
break
except Exception as e:
from time import sleep
sleep(1)
pass
lean_config["self-ip-address"] = slave_ip
logger.info(f"Using ip address '{slave_ip}' as own")
lean_config["self-ip-address"] = slave_domain
logger.debug(f"Using address '{slave_domain}' as own")

for configuration in compute:
lean_config["compute"] = configuration
for i in range(compute_index, int(configuration["count"]) + compute_index):
deploy(master_ip, master_port, token, True, update, no_update, image, lean_config, extra_docker_config, i)
deploy(master_domain, slave_domain, master_port, token, True, update, no_update, image,
lean_config, extra_docker_config, i)
10 changes: 6 additions & 4 deletions lean/components/docker/docker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ def __init__(self, logger: Logger, temp_manager: TempManager, platform_manager:
self._temp_manager = temp_manager
self._platform_manager = platform_manager

def get_image_label(self, image: DockerImage, label: str, default: str) -> str:
docker_image = self._get_docker_client().images.get(str(image))
def get_image_labels(self, image: str) -> str:
docker_image = self._get_docker_client().images.get(image)
return docker_image.labels.items()

for name, value in docker_image.labels.items():
def get_image_label(self, image: DockerImage, label: str, default: str) -> str:
for name, value in self.get_image_labels(str(image)):
if name == label:
self._logger.debug(f"Label '{label}' found in image '{image.name}', value {value}")
return value
Expand Down Expand Up @@ -179,7 +181,7 @@ def run_image(self, image: DockerImage, **kwargs) -> bool:
from time import sleep
i = 0
self._logger.info(f'Verifying deployment \'{container.name}\' is stable...')
while i < 35:
while i < 60:
i += 1
container.reload()
if (container.status != "running" and container.attrs and "State" in container.attrs and "ExitCode"
Expand Down

0 comments on commit 013bb6c

Please sign in to comment.