From 6112d8de4340edb28428a04eef7793ef9ea65a78 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Wed, 15 Jun 2022 00:35:11 +0000 Subject: [PATCH 1/7] Migrate retry_requests to urllib3 PoolManager --- skyplane/cli/cli_impl/cp_replicate.py | 4 +- skyplane/cli/cli_internal.py | 8 +++- skyplane/compute/server.py | 11 +++--- skyplane/gateway/gateway_sender.py | 43 ++++++++++++--------- skyplane/obj_store/gcs_interface.py | 3 +- skyplane/obj_store/s3_interface.py | 2 +- skyplane/replicate/replicator_client.py | 51 ++++++++++++++----------- skyplane/utils/net.py | 12 ------ 8 files changed, 71 insertions(+), 63 deletions(-) delete mode 100644 skyplane/utils/net.py diff --git a/skyplane/cli/cli_impl/cp_replicate.py b/skyplane/cli/cli_impl/cp_replicate.py index 399629df8..2ab733b58 100644 --- a/skyplane/cli/cli_impl/cp_replicate.py +++ b/skyplane/cli/cli_impl/cp_replicate.py @@ -244,6 +244,7 @@ def launch_replication_job( reuse_gateways: bool = False, use_bbr: bool = False, use_compression: bool = False, + verify_checksums: bool = True, # cloud provider specific options aws_instance_class: str = "m5.8xlarge", azure_instance_class: str = "Standard_D32_v4", @@ -314,7 +315,8 @@ def launch_replication_job( signal.signal(signal.SIGINT, s) # verify transfer - rc.verify_transfer(job) + if verify_checksums: + rc.verify_transfer(job) stats = stats if stats else {} stats["success"] = stats["monitor_status"] == "completed" diff --git a/skyplane/cli/cli_internal.py b/skyplane/cli/cli_internal.py index 5224fc240..5aba1ff11 100644 --- a/skyplane/cli/cli_internal.py +++ b/skyplane/cli/cli_internal.py @@ -24,6 +24,7 @@ def replicate_random( chunk_size_mb: int = typer.Option(8, "--chunk-size-mb", help="Chunk size in MB."), use_bbr: bool = typer.Option(True, help="If true, will use BBR congestion control"), reuse_gateways: bool = False, + debug: bool = False, ): """Replicate objects from remote object store to another remote object store.""" print_header() @@ -69,10 +70,11 @@ def replicate_random( stats = launch_replication_job( topo=topo, job=job, - debug=False, + debug=debug, reuse_gateways=reuse_gateways, use_bbr=use_bbr, use_compression=False, + verify_checksums=False, ) return 0 if stats["success"] else 1 @@ -95,6 +97,7 @@ def replicate_random_solve( skyplane_root / "profiles" / "throughput.csv", "--throughput-grid", help="Throughput grid file" ), solver_verbose: bool = False, + debug: bool = False, ): """Replicate objects from remote object store to another remote object store.""" print_header() @@ -153,9 +156,10 @@ def replicate_random_solve( stats = launch_replication_job( topo=topo, job=job, - debug=False, + debug=debug, reuse_gateways=reuse_gateways, use_bbr=use_bbr, use_compression=False, + verify_checksums=False, ) return 0 if stats["success"] else 1 diff --git a/skyplane/compute/server.py b/skyplane/compute/server.py index c9afd4f64..ec75d3e38 100644 --- a/skyplane/compute/server.py +++ b/skyplane/compute/server.py @@ -8,12 +8,11 @@ from pathlib import Path from typing import Dict, Optional, Tuple -import paramiko +import urllib3 from skyplane import config_path, key_root from skyplane.compute.const_cmds import make_autoshutdown_script, make_dozzle_command, make_sysctl_tcp_tuning_command from skyplane.utils import logger from skyplane.utils.fn import PathLike, wait_for -from skyplane.utils.net import retry_requests from skyplane.utils.retry import retry_backoff from skyplane.utils.timer import Timer @@ -312,14 +311,16 @@ def check_stderr(tup): self.gateway_api_url = f"http://127.0.0.1:{self.tunnel_port(8080 + 1)}" # wait for gateways to start (check status API) + http_pool = urllib3.PoolManager() + def is_api_ready(): try: api_url = f"{self.gateway_api_url}/api/v1/status" - status_val = retry_requests().get(api_url) - is_up = status_val.json().get("status") == "ok" + status_val = json.loads(http_pool.request("GET", api_url).data.decode("utf-8")) + is_up = status_val.get("status") == "ok" return is_up except Exception as e: - logger.error(f"{desc_prefix}: Failed to check gateway status: {e}") + logger.fs.error(f"{desc_prefix}: Failed to check gateway status: {e}") return False try: diff --git a/skyplane/gateway/gateway_sender.py b/skyplane/gateway/gateway_sender.py index 09e25fb9c..ebc5f31cd 100644 --- a/skyplane/gateway/gateway_sender.py +++ b/skyplane/gateway/gateway_sender.py @@ -1,3 +1,4 @@ +import json import queue import socket import ssl @@ -6,16 +7,15 @@ from functools import partial from multiprocessing import Event, Process, Queue from typing import Dict, List, Optional +import urllib3 import lz4.frame -import requests from skyplane import MB from skyplane.chunk import ChunkRequest from skyplane.gateway.chunk_store import ChunkStore from skyplane.utils import logger from skyplane.utils.retry import retry_backoff -from skyplane.utils.net import retry_requests from skyplane.utils.timer import Timer @@ -30,6 +30,7 @@ def __init__( use_tls: bool = True, use_compression: bool = True, ): + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.region = region self.chunk_store = chunk_store self.error_event = error_event @@ -58,6 +59,7 @@ def __init__( self.destination_ports: Dict[str, int] = {} # ip_address -> int self.destination_sockets: Dict[str, socket.socket] = {} # ip_address -> socket self.sent_chunk_ids: Dict[str, List[int]] = {} # ip_address -> list of chunk_ids + self.http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=3), cert_reqs="CERT_NONE") def start_workers(self): for ip, num_connections in self.outgoing_ports.items(): @@ -103,9 +105,9 @@ def worker_loop(self, worker_id: int, dest_ip: str): def wait_for_chunks(): cr_status = {} for ip, ip_chunk_ids in self.sent_chunk_ids.items(): - response = retry_requests().get(f"https://{ip}:8080/api/v1/incomplete_chunk_requests", verify=False) - assert response.status_code == 200, f"{response.status_code} {response.text}" - host_state = response.json()["chunk_requests"] + response = self.http_pool.request("GET", f"https://{ip}:8080/api/v1/incomplete_chunk_requests") + assert response.status == 200, f"{response.status_code} {response.data}" + host_state = json.loads(response.data.decode("utf-8"))["chunk_requests"] for chunk_id in ip_chunk_ids: if chunk_id in host_state: cr_status[chunk_id] = host_state[chunk_id]["state"] @@ -125,17 +127,17 @@ def wait_for_chunks(): # close servers logger.info(f"[sender:{worker_id}] exiting, closing servers") for dst_host, dst_port in self.destination_ports.items(): - response = retry_requests().delete(f"https://{dst_host}:8080/api/v1/servers/{dst_port}", verify=False) - assert response.status_code == 200 and response.json() == {"status": "ok"}, response.json() + response = self.http_pool.request("DELETE", f"https://{dst_host}:8080/api/v1/servers/{dst_port}") + assert response.status == 200 and json.loads(response.data.decode("utf-8")) == {"status": "ok"} logger.info(f"[sender:{worker_id}] closed destination socket {dst_host}:{dst_port}") def queue_request(self, chunk_request: ChunkRequest): self.worker_queue.put(chunk_request.chunk.chunk_id) def make_socket(self, dst_host): - response = retry_requests().post(f"https://{dst_host}:8080/api/v1/servers", verify=False) - assert response.status_code == 200, f"{response.status_code} {response.text}" - self.destination_ports[dst_host] = int(response.json()["server_port"]) + response = self.http_pool.request("POST", f"https://{dst_host}:8080/api/v1/servers") + assert response.status == 200, f"{response.status} {response.data.decode('utf-8')}" + self.destination_ports[dst_host] = int(json.loads(response.data.decode("utf-8"))["server_port"]) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((dst_host, self.destination_ports[dst_host])) original_timeout = sock.gettimeout() @@ -152,14 +154,19 @@ def make_socket(self, dst_host): def send_chunks(self, chunk_ids: List[int], dst_host: str): """Send list of chunks to gateway server, pipelining small chunks together into a single socket stream.""" # notify server of upcoming ChunkRequests - logger.debug(f"[sender:{self.worker_id}]:{chunk_ids} pre-registering chunks") - chunk_reqs = [self.chunk_store.get_chunk_request(chunk_id) for chunk_id in chunk_ids] - post_req = lambda: retry_requests().post( - f"https://{dst_host}:8080/api/v1/chunk_requests", json=[c.as_dict() for c in chunk_reqs], verify=False - ) - response = retry_backoff(post_req, exception_class=requests.exceptions.ConnectionError) - assert response.status_code == 200 and response.json()["status"] == "ok" - logger.debug(f"[sender:{self.worker_id}]:{chunk_ids} registered chunks") + with Timer(f"prepare to pre-register chunks {chunk_ids} to {dst_host}"): + logger.debug(f"[sender:{self.worker_id}]:{chunk_ids} pre-registering chunks") + chunk_reqs = [self.chunk_store.get_chunk_request(chunk_id) for chunk_id in chunk_ids] + register_body = json.dumps([c.as_dict() for c in chunk_reqs]).encode("utf-8") + with Timer(f"pre-register chunks {chunk_ids} to {dst_host}"): + response = self.http_pool.request( + "POST", + f"https://{dst_host}:8080/api/v1/chunk_requests", + body=register_body, + headers={"Content-Type": "application/json"}, + ) + assert response.status == 200 and json.loads(response.data.decode("utf-8")).get("status") == "ok" + logger.debug(f"[sender:{self.worker_id}]:{chunk_ids} registered chunks") # contact server to set up socket connection if self.destination_ports.get(dst_host) is None: diff --git a/skyplane/obj_store/gcs_interface.py b/skyplane/obj_store/gcs_interface.py index 93a982b59..37930bae5 100644 --- a/skyplane/obj_store/gcs_interface.py +++ b/skyplane/obj_store/gcs_interface.py @@ -25,6 +25,7 @@ def __init__(self, bucket_name, gcp_region="infer", create_bucket=False): self.auth = GCPAuthentication() # self.auth.set_service_account_credentials("skyplane1") # use service account credentials self._gcs_client = self.auth.get_storage_client() + self._requests_session = requests.Session() try: self.gcp_region = self.infer_gcp_region(bucket_name) if gcp_region is None or gcp_region == "infer" else gcp_region if not self.bucket_exists(): @@ -139,7 +140,7 @@ def send_xml_request( req = requests.Request(method, url, headers=headers) prepared = req.prepare() - response = requests.Session().send(prepared) + response = self._requests_session.send(prepared) if not response.ok: raise ValueError(f"Invalid status code {response.status_code}: {response.text}") diff --git a/skyplane/obj_store/s3_interface.py b/skyplane/obj_store/s3_interface.py index 5fb103c68..cdee0886a 100644 --- a/skyplane/obj_store/s3_interface.py +++ b/skyplane/obj_store/s3_interface.py @@ -24,7 +24,7 @@ def __init__(self, bucket_name, aws_region="infer", create_bucket=False): try: self.aws_region = self.infer_s3_region(bucket_name) if aws_region is None or aws_region == "infer" else aws_region if not self.bucket_exists(): - raise exceptions.MissingBucketException() + raise exceptions.MissingBucketException(f"Bucket {bucket_name} does not exist") except exceptions.MissingBucketException: if create_bucket: assert aws_region is not None and aws_region != "infer", "Must specify AWS region when creating bucket" diff --git a/skyplane/replicate/replicator_client.py b/skyplane/replicate/replicator_client.py index 535f5e903..67bf53272 100644 --- a/skyplane/replicate/replicator_client.py +++ b/skyplane/replicate/replicator_client.py @@ -1,15 +1,14 @@ import json import pickle -from re import S import time import uuid from datetime import datetime from functools import partial from typing import Dict, List, Optional, Tuple, Iterable -from numpy import source import pandas as pd from rich.progress import Progress, SpinnerColumn, TextColumn, TimeRemainingColumn, DownloadColumn, BarColumn, TransferSpeedColumn +import urllib3 from skyplane import GB, MB, tmp_log_dir from skyplane import exceptions @@ -25,7 +24,6 @@ from skyplane.replicate.replication_plan import ReplicationJob, ReplicationTopology, ReplicationTopologyGateway from skyplane.utils import logger from skyplane.utils.fn import PathLike, do_parallel -from skyplane.utils.net import retry_requests from skyplane.utils.timer import Timer @@ -39,6 +37,7 @@ def __init__( gcp_instance_class: Optional[str] = "n2-standard-16", # set to None to disable GCP gcp_use_premium_network: bool = True, ): + self.http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=3)) self.topology = topology self.gateway_docker_image = gateway_docker_image self.aws_instance_class = aws_instance_class @@ -420,12 +419,15 @@ def send_chunk_requests(args: Tuple[Server, List[ChunkRequest]]): hop_instance, chunk_requests = args while chunk_requests: batch, chunk_requests = chunk_requests[: 1024 * 16], chunk_requests[1024 * 16 :] - reply = retry_requests().post( - f"{hop_instance.gateway_api_url}/api/v1/chunk_requests", json=[c.as_dict() for c in batch] + reply = self.http_pool.request( + "POST", + f"{hop_instance.gateway_api_url}/api/v1/chunk_requests", + body=json.dumps([c.as_dict() for c in batch]).encode("utf-8"), + headers={"Content-Type": "application/json"}, ) - if reply.status_code != 200: + if reply.status != 200: raise Exception( - f"Failed to send chunk requests to gateway instance {hop_instance.instance_name()}: {reply.text}" + f"Failed to send chunk requests to gateway instance {hop_instance.instance_name()}: {reply.data.decode('utf-8')}" ) logger.fs.debug( f"Sent {len(batch)} chunk requests to {hop_instance.instance_name()}, {len(chunk_requests)} remaining" @@ -440,11 +442,13 @@ def send_chunk_requests(args: Tuple[Server, List[ChunkRequest]]): def get_chunk_status_log_df(self) -> pd.DataFrame: def get_chunk_status(args): node, instance = args - reply = retry_requests().get(f"{instance.gateway_api_url}/api/v1/chunk_status_log") - if reply.status_code != 200: - raise Exception(f"Failed to get chunk status from gateway instance {instance.instance_name()}: {reply.text}") + reply = self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/chunk_status_log") + if reply.status != 200: + raise Exception( + f"Failed to get chunk status from gateway instance {instance.instance_name()}: {reply.data.decode('utf-8')}" + ) logs = [] - for log_entry in reply.json()["chunk_status_log"]: + for log_entry in json.loads(reply.data.decode("utf-8"))["chunk_status_log"]: log_entry["region"] = node.region log_entry["instance"] = node.instance log_entry["time"] = datetime.fromisoformat(log_entry["time"]) @@ -460,10 +464,10 @@ def get_chunk_status(args): def check_error_logs(self) -> Dict[str, List[str]]: def get_error_logs(args): _, instance = args - reply = retry_requests().get(f"{instance.gateway_api_url}/api/v1/errors") - if reply.status_code != 200: - raise Exception(f"Failed to get error logs from gateway instance {instance.instance_name()}: {reply.text}") - return reply.json()["errors"] + reply = self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/errors") + if reply.status != 200: + raise Exception(f"Failed to get error logs from gateway instance {instance.instance_name()}: {reply.data.decode('utf-8')}") + return json.loads(reply.data.decode("utf-8"))["errors"] errors: Dict[str, List[str]] = {} for (_, instance), result in do_parallel(get_error_logs, self.bound_nodes.items(), n=-1): @@ -604,9 +608,9 @@ def complete_upload(req): progress.update(cleanup_task, description=": Getting compression ratio information") total_sent_compressed, total_sent_uncompressed = 0, 0 for gateway in {v for v in self.bound_nodes.values() if v.region_tag in source_regions}: - stats = retry_requests().get(f"{gateway.gateway_api_url}/api/v1/profile/compression") - if stats.status_code == 200: - stats = stats.json() + stats = self.http_pool.request("GET", f"{gateway.gateway_api_url}/api/v1/profile/compression") + if stats.status == 200: + stats = json.loads(stats.data.decode("utf-8")) total_sent_compressed += stats.get("compressed_bytes_sent", 0) total_sent_uncompressed += stats.get("uncompressed_bytes_sent", 0) compression_ratio = total_sent_compressed / total_sent_uncompressed if total_sent_uncompressed > 0 else 0 @@ -636,12 +640,13 @@ def copy_log(instance): if write_socket_profile: def write_socket_profile(instance): - receiver_reply = retry_requests().get(f"{instance.gateway_api_url}/api/v1/profile/socket/receiver") - if receiver_reply.status_code != 200: + receiver_reply = self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/profile/socket/receiver") + text = receiver_reply.data.decode("utf-8") + if receiver_reply.status != 200: logger.fs.error( - f"Failed to get receiver socket profile from {instance.gateway_api_url}: {receiver_reply.status_code} {receiver_reply.text}" + f"Failed to get receiver socket profile from {instance.gateway_api_url}: {receiver_reply.status} {text}" ) - (self.transfer_dir / f"receiver_socket_profile_{instance.uuid()}.json").write_text(receiver_reply.text) + (self.transfer_dir / f"receiver_socket_profile_{instance.uuid()}.json").write_text(text) progress.update(cleanup_task, description=": Writing socket profiles") do_parallel(write_socket_profile, self.bound_nodes.values(), n=-1) @@ -649,7 +654,7 @@ def write_socket_profile(instance): def fn(s: Server): try: - retry_requests().post(f"{s.gateway_api_url}/api/v1/shutdown") + self.http_pool.request("POST", f"{s.gateway_api_url}/api/v1/shutdown") except: return # ignore connection errors since server may be shutting down diff --git a/skyplane/utils/net.py b/skyplane/utils/net.py deleted file mode 100644 index b84a82859..000000000 --- a/skyplane/utils/net.py +++ /dev/null @@ -1,12 +0,0 @@ -import requests -from requests.adapters import HTTPAdapter -from urllib3 import Retry - - -def retry_requests(connect=3, backoff_factor=0.1): - session = requests.Session() - retry = Retry(connect=connect, backoff_factor=backoff_factor) - adapter = HTTPAdapter(max_retries=retry) - session.mount("http://", adapter) - session.mount("https://", adapter) - return session From 03b2aa3633207acfb558d66f252d67bb79b64fc0 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Wed, 15 Jun 2022 02:01:39 +0000 Subject: [PATCH 2/7] Start implementing stunnel --- Dockerfile | 22 +++++++++++++----- skyplane/compute/server.py | 2 +- skyplane/gateway/gateway_daemon_api.py | 31 +++++++++++--------------- 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/Dockerfile b/Dockerfile index fc0d37259..03338d2b3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,22 @@ # syntax=docker/dockerfile:1 FROM python:3.10-slim +# install apt packages +RUN --mount=type=cache,target=/var/cache/apt apt update \ + && apt-get install --no-install-recommends -y curl ca-certificates stunnel4 \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# configure stunnel +RUN mkdir -p /etc/stunnel \ + && mkdir -p /usr/local/var/run \ + && echo "client = no" >> /etc/stunnel/stunnel.conf \ + && echo "pid = /usr/local/var/run/stunnel.pid" >> /etc/stunnel/stunnel.conf \ + && echo "[gateway]" >> /etc/stunnel/stunnel.conf \ + && echo "accept = 8080" >> /etc/stunnel/stunnel.conf \ + && echo "connect = 127.0.0.1:8081" >> /etc/stunnel/stunnel.conf \ + && /etc/init.d/stunnel4 start + # increase number of open files and concurrent TCP connections RUN (echo 'net.ipv4.ip_local_port_range = 12000 65535' >> /etc/sysctl.conf) \ && (echo 'fs.file-max = 1048576' >> /etc/sysctl.conf) \ @@ -10,12 +26,6 @@ RUN (echo 'net.ipv4.ip_local_port_range = 12000 65535' >> /etc/sysctl.conf) \ && (echo 'root soft nofile 1048576' >> /etc/security/limits.conf) \ && (echo 'root hard nofile 1048576' >> /etc/security/limits.conf) -# install apt packages -RUN --mount=type=cache,target=/var/cache/apt apt update \ - && apt-get install --no-install-recommends -y curl ca-certificates \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* - # install gateway COPY scripts/requirements-gateway.txt /tmp/requirements-gateway.txt RUN --mount=type=cache,target=/root/.cache/pip pip3 install --no-cache-dir -r /tmp/requirements-gateway.txt && rm -r /tmp/requirements-gateway.txt diff --git a/skyplane/compute/server.py b/skyplane/compute/server.py index ec75d3e38..5db4561ee 100644 --- a/skyplane/compute/server.py +++ b/skyplane/compute/server.py @@ -302,6 +302,7 @@ def check_stderr(tup): docker_run_flags += " " + " ".join(f"--env {k}={v}" for k, v in docker_envs.items()) gateway_daemon_cmd = f"python -u /pkg/skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag} {'--use-compression' if use_compression else ''}" docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skyplane_gateway {gateway_docker_image} {gateway_daemon_cmd}" + logger.fs.info(f"{desc_prefix}: {docker_launch_cmd}") start_out, start_err = self.run_command(docker_launch_cmd) logger.fs.debug(desc_prefix + f": Gateway started {start_out.strip()}") assert not start_err.strip(), f"Error starting gateway: {start_err.strip()}" @@ -320,7 +321,6 @@ def is_api_ready(): is_up = status_val.get("status") == "ok" return is_up except Exception as e: - logger.fs.error(f"{desc_prefix}: Failed to check gateway status: {e}") return False try: diff --git a/skyplane/gateway/gateway_daemon_api.py b/skyplane/gateway/gateway_daemon_api.py index c7ffbb326..f16eac406 100644 --- a/skyplane/gateway/gateway_daemon_api.py +++ b/skyplane/gateway/gateway_daemon_api.py @@ -1,7 +1,10 @@ +from distutils.spawn import spawn +from gevent import monkey +monkey.patch_all() + import logging import logging.handlers import os -import ssl import threading from multiprocessing import Queue from queue import Empty @@ -9,6 +12,7 @@ from typing import Dict, List from flask import Flask, jsonify, request +from gevent.pywsgi import WSGIServer from werkzeug.serving import make_server from skyplane.chunk import ChunkRequest, ChunkState @@ -38,11 +42,10 @@ def __init__( error_event, error_queue: Queue, host="0.0.0.0", - port=8080, + port=8081, ): super().__init__() self.app = Flask("gateway_metadata_server") - self.app_http = Flask("gateway_metadata_server_http") self.chunk_store = chunk_store self.gateway_receiver = gateway_receiver self.error_event = error_event @@ -51,19 +54,16 @@ def __init__( self.error_list_lock = threading.Lock() # load routes - for app in (self.app, self.app_http): - self.register_global_routes(app) - self.register_server_routes(app) - self.register_request_routes(app) - self.register_error_routes(app) - self.register_socket_profiling_routes(app) + self.register_global_routes(self.app) + self.register_server_routes(self.app) + self.register_request_routes(self.app) + self.register_error_routes(self.app) + self.register_socket_profiling_routes(self.app) # make server self.host = host self.port = port - self.port_http = port + 1 - self.url = "https://{}:{}".format(host, port) - self.url_http = "http://{}:{}".format(host, self.port_http) + self.url = "http://{}:{}".format(host, port) # chunk status log self.chunk_status_log: List[Dict] = [] @@ -76,18 +76,13 @@ def __init__( self.receiver_socket_profiles_lock = threading.Lock() logging.getLogger("werkzeug").setLevel(logging.WARNING) - self.server = make_server(host, port, self.app, threaded=True, ssl_context="adhoc") - self.server_http = make_server(host, self.port_http, self.app_http, threaded=True) + self.server = make_server(host, port, self.app, threaded=True) def run(self): - self.server_http_thread = threading.Thread(target=self.server_http.serve_forever) - self.server_http_thread.start() self.server.serve_forever() def shutdown(self): self.server.shutdown() - self.server_http.shutdown() - self.server_http_thread.join() def register_global_routes(self, app): # index route returns API version From ddd92e15be07c8e2e27d24713c84e899e35eac5a Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Wed, 15 Jun 2022 03:31:30 +0000 Subject: [PATCH 3/7] Revert to werkzeug and launch stunnel on container launch --- Dockerfile | 13 ++++++++----- skyplane/compute/server.py | 2 +- skyplane/gateway/gateway_daemon_api.py | 5 ----- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/Dockerfile b/Dockerfile index 03338d2b3..5f2b4a79d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,13 +9,16 @@ RUN --mount=type=cache,target=/var/cache/apt apt update \ # configure stunnel RUN mkdir -p /etc/stunnel \ - && mkdir -p /usr/local/var/run \ + && openssl genrsa -out key.pem 2048 \ + && openssl req -new -x509 -key key.pem -out cert.pem -days 1095 -subj "/C=US/ST=California/L=San Francisco" \ + && cat key.pem cert.pem >> /etc/stunnel/stunnel.pem \ + && rm key.pem cert.pem \ + && mkdir -p /usr/local/var/run/ \ && echo "client = no" >> /etc/stunnel/stunnel.conf \ - && echo "pid = /usr/local/var/run/stunnel.pid" >> /etc/stunnel/stunnel.conf \ && echo "[gateway]" >> /etc/stunnel/stunnel.conf \ && echo "accept = 8080" >> /etc/stunnel/stunnel.conf \ - && echo "connect = 127.0.0.1:8081" >> /etc/stunnel/stunnel.conf \ - && /etc/init.d/stunnel4 start + && echo "connect = 8081" >> /etc/stunnel/stunnel.conf \ + && echo "cert = /etc/stunnel/stunnel.pem" >> /etc/stunnel/stunnel.conf # increase number of open files and concurrent TCP connections RUN (echo 'net.ipv4.ip_local_port_range = 12000 65535' >> /etc/sysctl.conf) \ @@ -34,4 +37,4 @@ WORKDIR /pkg COPY . . RUN pip3 install --no-dependencies -e ".[gateway]" -CMD ["python3", "skyplane/gateway/gateway_daemon.py"] \ No newline at end of file +CMD /etc/init.d/stunnel4 start && python3 skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{}' --region local \ No newline at end of file diff --git a/skyplane/compute/server.py b/skyplane/compute/server.py index 5db4561ee..3d6d73921 100644 --- a/skyplane/compute/server.py +++ b/skyplane/compute/server.py @@ -300,7 +300,7 @@ def check_stderr(tup): docker_run_flags += f" -v /tmp/{service_key_file}:/pkg/data/{service_key_file}" docker_run_flags += " " + " ".join(f"--env {k}={v}" for k, v in docker_envs.items()) - gateway_daemon_cmd = f"python -u /pkg/skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag} {'--use-compression' if use_compression else ''}" + gateway_daemon_cmd = f"/etc/init.d/stunnel4 start & && python -u /pkg/skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag} {'--use-compression' if use_compression else ''}" docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skyplane_gateway {gateway_docker_image} {gateway_daemon_cmd}" logger.fs.info(f"{desc_prefix}: {docker_launch_cmd}") start_out, start_err = self.run_command(docker_launch_cmd) diff --git a/skyplane/gateway/gateway_daemon_api.py b/skyplane/gateway/gateway_daemon_api.py index f16eac406..1c91531b9 100644 --- a/skyplane/gateway/gateway_daemon_api.py +++ b/skyplane/gateway/gateway_daemon_api.py @@ -1,7 +1,3 @@ -from distutils.spawn import spawn -from gevent import monkey -monkey.patch_all() - import logging import logging.handlers import os @@ -12,7 +8,6 @@ from typing import Dict, List from flask import Flask, jsonify, request -from gevent.pywsgi import WSGIServer from werkzeug.serving import make_server from skyplane.chunk import ChunkRequest, ChunkState From f3a2e9715c3952a24fc3e3b21efb46c1ab319830 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Wed, 15 Jun 2022 03:41:13 +0000 Subject: [PATCH 4/7] Update launch script --- Dockerfile | 2 +- skyplane/compute/server.py | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5f2b4a79d..efa42fe55 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,4 +37,4 @@ WORKDIR /pkg COPY . . RUN pip3 install --no-dependencies -e ".[gateway]" -CMD /etc/init.d/stunnel4 start && python3 skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{}' --region local \ No newline at end of file +CMD /etc/init.d/stunnel4 start; python3 /pkg/skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{}' --region local \ No newline at end of file diff --git a/skyplane/compute/server.py b/skyplane/compute/server.py index 3d6d73921..20b0faf50 100644 --- a/skyplane/compute/server.py +++ b/skyplane/compute/server.py @@ -300,12 +300,12 @@ def check_stderr(tup): docker_run_flags += f" -v /tmp/{service_key_file}:/pkg/data/{service_key_file}" docker_run_flags += " " + " ".join(f"--env {k}={v}" for k, v in docker_envs.items()) - gateway_daemon_cmd = f"/etc/init.d/stunnel4 start & && python -u /pkg/skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag} {'--use-compression' if use_compression else ''}" + gateway_daemon_cmd = f"\"/etc/init.d/stunnel4 start && python -u /pkg/skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag} {'--use-compression' if use_compression else ''}\"" docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skyplane_gateway {gateway_docker_image} {gateway_daemon_cmd}" logger.fs.info(f"{desc_prefix}: {docker_launch_cmd}") start_out, start_err = self.run_command(docker_launch_cmd) logger.fs.debug(desc_prefix + f": Gateway started {start_out.strip()}") - assert not start_err.strip(), f"Error starting gateway: {start_err.strip()}" + assert not start_err.strip(), f"Error starting gateway:\n{start_out.strip()}\n{start_err.strip()}" gateway_container_hash = start_out.strip().split("\n")[-1][:12] self.gateway_log_viewer_url = f"http://127.0.0.1:{self.tunnel_port(8888)}/container/{gateway_container_hash}" @@ -331,9 +331,6 @@ def is_api_ready(): logger.fs.warning(desc_prefix + " gateway launch command: " + docker_launch_cmd) logs, err = self.run_command(f"sudo docker logs skyplane_gateway --tail=100") logger.fs.error(f"Docker logs: {logs}\nerr: {err}") - - out, err = self.run_command(docker_launch_cmd.replace(" -d ", " ")) - logger.fs.error(f"Relaunching gateway in foreground\nout: {out}\nerr: {err}") logger.fs.exception(e) raise e finally: From 9ccaa862750e3af23e9c48e65b92ac613f9a1af9 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Wed, 15 Jun 2022 03:42:08 +0000 Subject: [PATCH 5/7] Run in bash --- skyplane/compute/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skyplane/compute/server.py b/skyplane/compute/server.py index 20b0faf50..df0037573 100644 --- a/skyplane/compute/server.py +++ b/skyplane/compute/server.py @@ -300,7 +300,7 @@ def check_stderr(tup): docker_run_flags += f" -v /tmp/{service_key_file}:/pkg/data/{service_key_file}" docker_run_flags += " " + " ".join(f"--env {k}={v}" for k, v in docker_envs.items()) - gateway_daemon_cmd = f"\"/etc/init.d/stunnel4 start && python -u /pkg/skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag} {'--use-compression' if use_compression else ''}\"" + gateway_daemon_cmd = f"/bin/bash -c \"/etc/init.d/stunnel4 start && python -u /pkg/skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag} {'--use-compression' if use_compression else ''}\"" docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skyplane_gateway {gateway_docker_image} {gateway_daemon_cmd}" logger.fs.info(f"{desc_prefix}: {docker_launch_cmd}") start_out, start_err = self.run_command(docker_launch_cmd) From caf8e7e6d6517ab2e845c2867232fd1fd02381e5 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Wed, 15 Jun 2022 03:46:54 +0000 Subject: [PATCH 6/7] Escape bash launch command --- skyplane/compute/server.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/skyplane/compute/server.py b/skyplane/compute/server.py index df0037573..b716f192a 100644 --- a/skyplane/compute/server.py +++ b/skyplane/compute/server.py @@ -300,8 +300,9 @@ def check_stderr(tup): docker_run_flags += f" -v /tmp/{service_key_file}:/pkg/data/{service_key_file}" docker_run_flags += " " + " ".join(f"--env {k}={v}" for k, v in docker_envs.items()) - gateway_daemon_cmd = f"/bin/bash -c \"/etc/init.d/stunnel4 start && python -u /pkg/skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag} {'--use-compression' if use_compression else ''}\"" - docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skyplane_gateway {gateway_docker_image} {gateway_daemon_cmd}" + gateway_daemon_cmd = f"/etc/init.d/stunnel4 start && python -u /pkg/skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag} {'--use-compression' if use_compression else ''}" + escaped_gateway_daemon_cmd = gateway_daemon_cmd.replace('"', '\\"') + docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skyplane_gateway {gateway_docker_image} /bin/bash -c \"{escaped_gateway_daemon_cmd}\"" logger.fs.info(f"{desc_prefix}: {docker_launch_cmd}") start_out, start_err = self.run_command(docker_launch_cmd) logger.fs.debug(desc_prefix + f": Gateway started {start_out.strip()}") From 4e871d3fecce7bd7e6dffa96d78a8daf784e3620 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Wed, 15 Jun 2022 03:48:45 +0000 Subject: [PATCH 7/7] Format code --- skyplane/compute/server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/skyplane/compute/server.py b/skyplane/compute/server.py index b716f192a..21f930aac 100644 --- a/skyplane/compute/server.py +++ b/skyplane/compute/server.py @@ -302,7 +302,9 @@ def check_stderr(tup): docker_run_flags += " " + " ".join(f"--env {k}={v}" for k, v in docker_envs.items()) gateway_daemon_cmd = f"/etc/init.d/stunnel4 start && python -u /pkg/skyplane/gateway/gateway_daemon.py --chunk-dir /skyplane/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag} {'--use-compression' if use_compression else ''}" escaped_gateway_daemon_cmd = gateway_daemon_cmd.replace('"', '\\"') - docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skyplane_gateway {gateway_docker_image} /bin/bash -c \"{escaped_gateway_daemon_cmd}\"" + docker_launch_cmd = ( + f'sudo docker run {docker_run_flags} --name skyplane_gateway {gateway_docker_image} /bin/bash -c "{escaped_gateway_daemon_cmd}"' + ) logger.fs.info(f"{desc_prefix}: {docker_launch_cmd}") start_out, start_err = self.run_command(docker_launch_cmd) logger.fs.debug(desc_prefix + f": Gateway started {start_out.strip()}")