From 1477dd21df187bbe32c70e61a4b8fd1043bb0646 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Fri, 31 Dec 2021 08:33:41 +0000 Subject: [PATCH 1/9] Direct replication test --- environment.yml | 3 +- setup.py | 1 + skylark/gateway/chunk_store.py | 99 ++++----- skylark/gateway/gateway_api.py | 152 -------------- skylark/gateway/gateway_daemon.py | 111 +++++++++- skylark/gateway/gateway_daemon_api.py | 194 ++++++++++++++++++ .../{gateway.py => gateway_reciever.py} | 63 ++---- skylark/gateway/gateway_sender.py | 125 +++++++++++ skylark/gateway/wire_protocol_header.py | 14 +- skylark/replicate/obj_store.py | 19 +- skylark/replicate/replicator_client.py | 22 +- skylark/test/test_gateway_daemon.py | 49 +++++ skylark/test/test_null_replicator.py | 23 --- .../test_replicator_client.py | 0 14 files changed, 572 insertions(+), 303 deletions(-) delete mode 100644 skylark/gateway/gateway_api.py create mode 100644 skylark/gateway/gateway_daemon_api.py rename skylark/gateway/{gateway.py => gateway_reciever.py} (64%) create mode 100644 skylark/gateway/gateway_sender.py create mode 100644 skylark/test/test_gateway_daemon.py delete mode 100644 skylark/test/test_null_replicator.py rename skylark/{replicate => test}/test_replicator_client.py (100%) diff --git a/environment.yml b/environment.yml index b3713a5ef..a17ba21f0 100644 --- a/environment.yml +++ b/environment.yml @@ -23,4 +23,5 @@ dependencies: - paramiko - questionary - ray - - tqdm \ No newline at end of file + - tqdm + - werkzeug \ No newline at end of file diff --git a/setup.py b/setup.py index 668c1e0c6..7fb0bab6c 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,7 @@ "paramiko", "questionary", "tqdm", + "werkzeug", ], extras_require={"test": ["black", "pytest", "ipython", "jupyter_console"]} ) \ No newline at end of file diff --git a/skylark/gateway/chunk_store.py b/skylark/gateway/chunk_store.py index 1ce2ac287..bc5ac86ad 100644 --- a/skylark/gateway/chunk_store.py +++ b/skylark/gateway/chunk_store.py @@ -1,6 +1,7 @@ from dataclasses import asdict, dataclass -from enum import Enum +from enum import Enum, auto from multiprocessing import Manager +from os import PathLike from pathlib import Path from typing import Dict, List, Optional, Tuple @@ -70,16 +71,20 @@ def from_dict(in_dict: Dict): class ChunkState(Enum): - REGISTERED = "registered" - DOWNLOAD_IN_PROGRESS = "download_in_progress" - READY_TO_UPLOAD = "ready_to_upload" - UPLOAD_IN_PROGRESS = "upload_in_progress" - UPLOAD_COMPLETE = "upload_complete" - FAILED = "failed" + registered = auto() + download_in_progress = auto() + ready_to_upload = auto() + upload_in_progress = auto() + upload_complete = auto() + failed = auto() + + @staticmethod + def from_str(s: str): + return ChunkState[s.lower()] class ChunkStore: - def __init__(self, chunk_dir: str = "/dev/shm/skylark/chunks"): + def __init__(self, chunk_dir: PathLike): self.chunk_dir = Path(chunk_dir) self.chunk_dir.mkdir(parents=True, exist_ok=True) @@ -90,13 +95,9 @@ def __init__(self, chunk_dir: str = "/dev/shm/skylark/chunks"): # multiprocess-safe concurrent structures self.manager = Manager() - self.chunks: Dict[int, Chunk] = self.manager.dict() + self.chunk_requests: Dict[int, Chunk] = self.manager.dict() self.chunk_status: Dict[int, ChunkState] = self.manager.dict() - self.pending_chunk_requests: List[ChunkRequest] = self.manager.list() - self.downloaded_chunk_requests: List[ChunkRequest] = self.manager.list() - self.uploaded_chunk_requests: List[ChunkRequest] = self.manager.list() - def get_chunk_file_path(self, chunk_id: int) -> Path: return self.chunk_dir / f"{chunk_id}.chunk" @@ -110,80 +111,50 @@ def set_chunk_status(self, chunk_id: int, new_status: ChunkState): self.chunk_status[chunk_id] = new_status def start_download(self, chunk_id: int): - if self.get_chunk_status(chunk_id) == ChunkState.REGISTERED: - self.set_chunk_status(chunk_id, ChunkState.DOWNLOAD_IN_PROGRESS) + if self.get_chunk_status(chunk_id) == ChunkState.registered: + self.set_chunk_status(chunk_id, ChunkState.download_in_progress) else: raise ValueError(f"Invalid transition start_download from {self.get_chunk_status(chunk_id)}") def finish_download(self, chunk_id: int, runtime_s: Optional[float] = None): # todo log runtime to statistics store - if self.get_chunk_status(chunk_id) == ChunkState.DOWNLOAD_IN_PROGRESS: - self.set_chunk_status(chunk_id, ChunkState.READY_TO_UPLOAD) + if self.get_chunk_status(chunk_id) == ChunkState.download_in_progress: + self.set_chunk_status(chunk_id, ChunkState.ready_to_upload) else: raise ValueError(f"Invalid transition finish_download from {self.get_chunk_status(chunk_id)}") def start_upload(self, chunk_id: int): - if self.get_chunk_status(chunk_id) == ChunkState.READY_TO_UPLOAD: - self.set_chunk_status(chunk_id, ChunkState.UPLOAD_IN_PROGRESS) + if self.get_chunk_status(chunk_id) == ChunkState.ready_to_upload: + self.set_chunk_status(chunk_id, ChunkState.upload_in_progress) else: raise ValueError(f"Invalid transition start_upload from {self.get_chunk_status(chunk_id)}") def finish_upload(self, chunk_id: int, runtime_s: Optional[float] = None): # todo log runtime to statistics store - if self.get_chunk_status(chunk_id) == ChunkState.UPLOAD_IN_PROGRESS: - self.set_chunk_status(chunk_id, ChunkState.UPLOAD_COMPLETE) + if self.get_chunk_status(chunk_id) == ChunkState.upload_in_progress: + self.set_chunk_status(chunk_id, ChunkState.upload_complete) else: raise ValueError(f"Invalid transition finish_upload from {self.get_chunk_status(chunk_id)}") def fail(self, chunk_id: int): - if self.get_chunk_status(chunk_id) != ChunkState.UPLOAD_COMPLETE: - self.set_chunk_status(chunk_id, ChunkState.FAILED) + if self.get_chunk_status(chunk_id) != ChunkState.upload_complete: + self.set_chunk_status(chunk_id, ChunkState.failed) else: raise ValueError(f"Invalid transition fail from {self.get_chunk_status(chunk_id)}") ### # Chunk management ### - def get_chunks(self): - return self.chunks.values() - - def get_chunk(self, chunk_id: int) -> Optional[Chunk]: - return self.chunks[chunk_id] if chunk_id in self.chunks else None - - def add_chunk(self, chunk: Chunk): - self.chunks[chunk.chunk_id] = chunk - self.set_chunk_status(chunk.chunk_id, "registered") - - def get_chunk_requests(self) -> Tuple[List[ChunkRequest], List[ChunkRequest], List[ChunkRequest]]: - return list(self.pending_chunk_requests), list(self.downloaded_chunk_requests), list(self.uploaded_chunk_requests) + def get_chunk_requests(self, status: Optional[ChunkState] = None) -> List[ChunkRequest]: + if status is None: + return list(self.chunk_requests.values()) + else: + return [req for i, req in self.chunk_requests.items() if self.get_chunk_status(i) == status] def get_chunk_request(self, chunk_id: int) -> Optional[ChunkRequest]: - for chunk_request in self.pending_chunk_requests: - if chunk_request.chunk.chunk_id == chunk_id: - return chunk_request - for chunk_request in self.downloaded_chunk_requests: - if chunk_request.chunk.chunk_id == chunk_id: - return chunk_request - for chunk_request in self.uploaded_chunk_requests: - if chunk_request.chunk.chunk_id == chunk_id: - return chunk_request - return None - - def add_chunk_request_pending(self, chunk_request: ChunkRequest): - self.pending_chunk_requests.append(chunk_request) - - def add_chunk_request_downloaded(self, chunk_request: ChunkRequest): - self.downloaded_chunk_requests.append(chunk_request) - - def add_chunk_request_uploaded(self, chunk_request: ChunkRequest): - self.uploaded_chunk_requests.append(chunk_request) - - def mark_chunk_request_downloaded(self, chunk_request: ChunkRequest): - assert chunk_request in self.pending_chunk_requests - self.pending_chunk_requests.remove(chunk_request) - self.downloaded_chunk_requests.append(chunk_request) - - def mark_chunk_request_uploaded(self, chunk_request: ChunkRequest): - assert chunk_request in self.downloaded_chunk_requests - self.downloaded_chunk_requests.remove(chunk_request) - self.uploaded_chunk_requests.append(chunk_request) + return self.chunk_requests[chunk_id] if chunk_id in self.chunk_requests else None + + def add_chunk_request(self, chunk_request: ChunkRequest, state=ChunkState.registered): + logger.debug(f"Adding chunk request {chunk_request.chunk.chunk_id}") + self.set_chunk_status(chunk_request.chunk.chunk_id, state) + self.chunk_requests[chunk_request.chunk.chunk_id] = chunk_request diff --git a/skylark/gateway/gateway_api.py b/skylark/gateway/gateway_api.py deleted file mode 100644 index 6e3b6178c..000000000 --- a/skylark/gateway/gateway_api.py +++ /dev/null @@ -1,152 +0,0 @@ -from flask import Flask, jsonify, request - -from skylark.gateway.chunk_store import ChunkRequest -from skylark.gateway.gateway import Gateway - - -class GatewayMetadataServer: - """ - API documentation: - * GET /api/v1/status - returns status of API - * GET /api/v1/servers - returns list of running servers - * POST /api/v1/servers - starts a new server - * DELETE /api/v1/servers/ - stops a server - * GET /api/v1/chunks - returns list of chunks - * GET /api/v1/chunks/ - returns chunk details - * GET /api/v1/chunk_requests - returns list of pending chunk requests - * GET /api/v1/chunk_requests/ - returns chunk request details - * POST /api/v1/chunk_requests - adds a new chunk request to end of pending requests - """ - - def __init__(self, gateway: Gateway): - self.app = Flask("gateway_metadata_server") - self.gateway = gateway - self.register_global_routes() - self.register_server_routes() - self.register_chunk_routes() - self.register_request_routes() - - def run(self, host="0.0.0.0", port=8080): - self.app.run(host=host, port=port, debug=True) - - def register_global_routes(self): - # index route returns API version - @self.app.route("/", methods=["GET"]) - def get_index(): - return jsonify({"version": "v1"}) - - # index for v1 api routes - @self.app.route("/api/v1", methods=["GET"]) - def get_v1_index(): - return jsonify({"version": "v1"}) - - # status route returns if API is up - @self.app.route("/api/v1/status", methods=["GET"]) - def get_status(): - return jsonify({"status": "ok"}) - - def register_server_routes(self): - # list running gateway servers w/ ports - @self.app.route("/api/v1/servers", methods=["GET"]) - def get_server_ports(): - return jsonify({"server_ports": self.gateway.server_ports}) - - # add a new server - @self.app.route("/api/v1/servers", methods=["POST"]) - def add_server(): - new_port = self.gateway.start_server() - return jsonify({"server_port": new_port}) - - # remove a server - @self.app.route("/api/v1/servers/", methods=["DELETE"]) - def remove_server(port: int): - try: - self.gateway.stop_server(port) - return jsonify({"status": "ok"}) - except ValueError as e: - return jsonify({"error": str(e)}), 400 - - def register_chunk_routes(self): - # list chunks - @self.app.route("/api/v1/chunks", methods=["GET"]) - def get_chunks(): - reply = {} - for chunk_data in self.gateway.chunk_store.get_chunks(): - chunk_id = chunk_data.chunk_id - reply[chunk_id] = chunk_data.copy() - return jsonify(reply) - - # get chunk details - @self.app.route("/api/v1/chunks/", methods=["GET"]) - def get_chunk(chunk_id: int): - if chunk_id in self.gateway.chunks: - return jsonify(dict(self.gateway.chunk_store.get_chunk(chunk_id))) - else: - return jsonify({"error": f"Chunk {chunk_id} not found"}), 404 - - def register_request_routes(self): - # list all chunk requests - @self.app.route("/api/v1/chunk_requests", methods=["GET"]) - def get_all_chunk_requests(): - pending, downloaded, uploaded = self.gateway.chunk_store.get_chunk_requests() - return jsonify({"pending": pending, "downloaded": downloaded, "uploaded": uploaded}) - - # list pending chunk requests - @self.app.route("/api/v1/chunk_requests/pending", methods=["GET"]) - def get_pending_chunk_requests(): - pending = self.gateway.chunk_store.get_chunk_requests()[0] - return jsonify({"pending": pending}) - - # list downloaded chunk requests - @self.app.route("/api/v1/chunk_requests/downloaded", methods=["GET"]) - def get_downloaded_chunk_requests(): - downloaded = self.gateway.chunk_store.get_chunk_requests()[1] - return jsonify({"downloaded": downloaded}) - - # list uploaded chunk requests - @self.app.route("/api/v1/chunk_requests/uploaded", methods=["GET"]) - def get_uploaded_chunk_requests(): - uploaded = self.gateway.chunk_store.get_chunk_requests()[2] - return jsonify({"uploaded": uploaded}) - - # lookup chunk request given chunk id - @self.app.route("/api/v1/chunk_requests/", methods=["GET"]) - def get_chunk_request(chunk_id: int): - chunk_req = self.gateway.chunk_store.get_chunk_request(chunk_id) - if chunk_req is None: - return jsonify({"error": f"Chunk {chunk_id} not found"}), 404 - else: - return jsonify(chunk_req) - - # add a new chunk request to end of pending requests - @self.app.route("/api/v1/chunk_requests/pending", methods=["POST"]) - def add_chunk_request(): - if isinstance(request.json, dict): - self.gateway.chunk_store.add_chunk_request_pending(ChunkRequest.from_dict(request.json)) - return jsonify({"status": "ok"}) - elif isinstance(request.json, list): - for chunk_req in request.json: - self.gateway.chunk_store.add_chunk_request_pending(ChunkRequest.from_dict(chunk_req)) - return jsonify({"status": "ok"}) - - # add a new chunk request to end of downloaded requests - @self.app.route("/api/v1/chunk_requests/downloaded", methods=["POST"]) - def add_downloaded_chunk_request(): - if isinstance(request.json, dict): - self.gateway.chunk_store.add_chunk_request_downloaded(ChunkRequest.from_dict(request.json)) - return jsonify({"status": "ok"}) - elif isinstance(request.json, list): - for chunk_req in request.json: - self.gateway.chunk_store.add_chunk_request_downloaded(ChunkRequest.from_dict(chunk_req)) - return jsonify({"status": "ok"}) - - # add a new chunk request to end of uploaded requests - @self.app.route("/api/v1/chunk_requests/uploaded", methods=["POST"]) - def add_uploaded_chunk_request(): - if isinstance(request.json, dict): - self.gateway.chunk_store.add_chunk_request_uploaded(ChunkRequest.from_dict(request.json)) - return jsonify({"status": "ok"}) - elif isinstance(request.json, list): - for chunk_req in request.json: - self.gateway.chunk_store.add_chunk_request_uploaded(ChunkRequest.from_dict(chunk_req)) - return jsonify({"status": "ok"}) diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index 3f02e6070..f499c5cfe 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -1,7 +1,108 @@ -from skylark.gateway.gateway import Gateway -from skylark.gateway.gateway_api import GatewayMetadataServer +import argparse +import atexit +from multiprocessing import Event +import os +import re +import signal +from os import PathLike +from pathlib import Path +import sys +import time +from typing import Optional + +from loguru import logger +from skylark.gateway.chunk_store import ChunkRequest, ChunkState, ChunkStore + +from skylark.gateway.gateway_reciever import GatewayReciever +from skylark.gateway.gateway_daemon_api import GatewayDaemonAPI +from skylark.gateway.gateway_sender import GatewaySender + + +class GatewayDaemon: + def __init__(self, chunk_dir: PathLike, debug=False, log_dir: Optional[PathLike] = None, outgoing_connections=1, outgoing_batch_size=1): + if log_dir is not None: + log_dir = Path(log_dir) + log_dir.mkdir(exist_ok=True) + logger.add(log_dir / "gateway_daemon.log", rotation="10 MB") + logger.add(sys.stderr, level="DEBUG" if debug else "INFO") + self.chunk_store = ChunkStore(chunk_dir) + self.gateway_reciever = GatewayReciever(chunk_store=self.chunk_store) + self.gateway_sender = GatewaySender(chunk_store=self.chunk_store, n_processes=outgoing_connections, batch_size=outgoing_batch_size) + + # API server + self.api_server = GatewayDaemonAPI(self.chunk_store, self.gateway_reciever, debug=debug, log_dir=log_dir) + self.api_server.start() + atexit.register(self.cleanup) + logger.info("Gateway daemon API started") + + def cleanup(self): + logger.warning("Shutting down gateway daemon") + self.api_server.shutdown() + + def run(self): + exit_flag = Event() + + def exit_handler(signum, frame): + logger.warning("Received signal {}. Exiting...".format(signum)) + exit_flag.set() + self.gateway_reciever.stop_servers() + self.gateway_sender.stop_workers() + sys.exit(0) + + logger.info("Starting gateway sender workers") + self.gateway_sender.start_workers() + signal.signal(signal.SIGINT, exit_handler) + signal.signal(signal.SIGTERM, exit_handler) + + logger.info("Starting daemon loop") + while not exit_flag.is_set(): + # queue object uploads and relays + for chunk_req in self.chunk_store.get_chunk_requests(ChunkState.ready_to_upload): + logger.info(f"Relaying chunk {chunk_req.chunk.chunk_id}") + if len(chunk_req.path) > 0: + current_hop = chunk_req.path[0] + if current_hop.chunk_location_type == "dst_object_store": + logger.warning(f"NOT IMPLEMENTED: Queuing object store upload for chunk {chunk_req.chunk_id}") + elif current_hop.chunk_location_type == "relay": + logger.info(f"Queuing chunk {chunk_req.chunk.chunk_id} for relay") + self.gateway_sender.queue_request(chunk_req) + self.chunk_store.start_upload(chunk_req.chunk.chunk_id) + else: + raise ValueError(f"Unknown or incorrect chunk_location_type {current_hop.chunk_location_type}") + else: + logger.error(f"Ready to upload chunk {chunk_req.chunk_id} has no hops") + + # queue object store downloads and relays (if space is available) + # todo ensure space is available + for chunk_req in self.chunk_store.get_chunk_requests(ChunkState.registered): + logger.info(f"Downloading chunk {chunk_req.chunk.chunk_id}") + if len(chunk_req.path) > 0: + current_hop = chunk_req.path[0] + if current_hop.chunk_location_type == "src_object_store": + logger.warning(f"NOT IMPLEMENTED: Queuing object store download for chunk {chunk_req.chunk_id}") + elif current_hop.chunk_location_type.startswith("random-"): + self.chunk_store.start_download(chunk_req.chunk.chunk_id) + size_mb = re.search(r"random-(\d+)MB", current_hop.chunk_location_type).group(1) + logger.info(f"Generating {size_mb}MB random chunk {chunk_req.chunk.chunk_id}") + with self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).open("wb") as f: + f.write(os.urandom(int(size_mb) * 1024 * 1024)) + self.chunk_store.finish_download(chunk_req.chunk.chunk_id) + else: + raise ValueError(f"Unknown or incorrect chunk_location_type {current_hop.chunk_location_type}") + else: + logger.error(f"Registered chunk {chunk_req.chunk_id} has no hops") + + time.sleep(0.5) + if __name__ == "__main__": - gateway = Gateway() - gateway_server = GatewayMetadataServer(gateway) - gateway_server.run() + parser = argparse.ArgumentParser(description="Skylark Gateway Daemon") + parser.add_argument("--chunk-dir", type=Path, default="/dev/shm/skylark/chunks", required=True, help="Directory to store chunks") + parser.add_argument("--debug", action="store_true", help="Enable debug mode for Flask") + parser.add_argument("--log-dir", type=Path, default=Path("/var/log/skylark"), help="Directory to write logs to") + parser.add_argument("--outgoing-connections", type=int, default=1, help="Number of outgoing connections to make to the next relay") + args = parser.parse_args() + daemon = GatewayDaemon( + chunk_dir=args.chunk_dir, debug=args.debug, log_dir=Path(args.log_dir), outgoing_connections=args.outgoing_connections + ) + daemon.run() diff --git a/skylark/gateway/gateway_daemon_api.py b/skylark/gateway/gateway_daemon_api.py new file mode 100644 index 000000000..18e3d31ea --- /dev/null +++ b/skylark/gateway/gateway_daemon_api.py @@ -0,0 +1,194 @@ +import logging +import logging.handlers +import os +from pathlib import Path +import threading +from typing import List + +from flask import Flask, jsonify, request, send_from_directory +from werkzeug.serving import make_server + +from skylark.gateway.chunk_store import ChunkRequest, ChunkState, ChunkStore +from skylark.gateway.gateway_reciever import GatewayReciever + + +class GatewayDaemonAPI(threading.Thread): + """ + API documentation: + * GET /api/v1/status - returns status of API + * GET /api/v1/servers - returns list of running servers + * POST /api/v1/servers - starts a new server + * DELETE /api/v1/servers/ - stops a server + * GET /api/v1/chunk_requests - returns list of chunk requests (use {'state': ''} to filter) + * GET /api/v1/chunk_requests/ - returns chunk request + * POST /api/v1/chunk_requests - adds a new chunk request + * PUT /api/v1/chunk_requests/ - updates chunk request + """ + + def __init__(self, chunk_store: ChunkStore, gateway_reciever: GatewayReciever, host="0.0.0.0", port=8080, debug=False, log_dir=None): + super().__init__() + self.app = Flask("gateway_metadata_server") + self.chunk_store = chunk_store + self.gateway_reciever = gateway_reciever + + # load routes + self.register_global_routes() + self.register_server_routes() + self.register_request_routes() + + # make server + self.log_dir = log_dir + if log_dir is not None: + log_dir = Path(log_dir) + handler = logging.handlers.RotatingFileHandler(log_dir / "gateway_daemon_api.log", maxBytes=1024 * 1024 * 10) + logging.getLogger("werkzeug").addHandler(handler) + if debug: + logging.getLogger("werkzeug").addHandler(logging.StreamHandler()) + logging.getLogger("werkzeug").setLevel(logging.DEBUG) + else: + logging.getLogger("werkzeug").setLevel(logging.INFO) + if debug: + self.app.config["JSONIFY_PRETTYPRINT_REGULAR"] = True + self.app.config["TESTING"] = True + self.server = make_server(host, port, self.app, threaded=True) + + def run(self): + self.server.serve_forever() + + def shutdown(self): + self.server.shutdown() + + def register_global_routes(self): + # index route returns API version + @self.app.route("/", methods=["GET"]) + def get_index(): + return jsonify({"version": "v1"}) + + # index for v1 api routes, return all available routes as HTML page with links + @self.app.route("/api/v1", methods=["GET"]) + def get_v1_index(): + output = "" + for rule in sorted(self.app.url_map.iter_rules(), key=lambda r: r.rule): + if rule.endpoint != "static": + methods = set(m for m in rule.methods if m not in ["HEAD", "OPTIONS"]) + output += f"{rule.rule}: {methods}
" + return output + + # status route returns if API is up + @self.app.route("/api/v1/status", methods=["GET"]) + def get_status(): + return jsonify({"status": "ok"}) + + # shutdown route + @self.app.route("/api/v1/shutdown", methods=["POST"]) + def shutdown(): + self.shutdown() + return jsonify({"status": "ok"}) + + # serve directory listing of /var/logs/skylark on /api/v1/logs + @self.app.route("/api/v1/logs", methods=["GET"]) + def get_logs(): + if self.log_dir: + out = "" + for f in os.listdir("/var/log/skylark"): + out += f"{f}
" + return out + else: + return "No log directory set", 400 + + # serve log file on /api/v1/logs/ + @self.app.route("/api/v1/logs/", methods=["GET"]) + def get_log(path): + if self.log_dir: + return send_from_directory("/var/log/skylark", path, as_attachment=False, mimetype="text/plain") + else: + return "No log directory set", 400 + + def register_server_routes(self): + # list running gateway servers w/ ports + @self.app.route("/api/v1/servers", methods=["GET"]) + def get_server_ports(): + return jsonify({"server_ports": self.gateway_reciever.server_ports}) + + # add a new server + @self.app.route("/api/v1/servers", methods=["POST"]) + def add_server(): + new_port = self.gateway_reciever.start_server() + return jsonify({"server_port": new_port}) + + # remove a server + @self.app.route("/api/v1/servers/", methods=["DELETE"]) + def remove_server(port: int): + try: + self.gateway_reciever.stop_server(port) + return jsonify({"status": "ok"}) + except ValueError as e: + return jsonify({"error": str(e)}), 400 + + def register_request_routes(self): + def make_chunk_req_payload(chunk_req: ChunkRequest): + state = self.chunk_store.get_chunk_status(chunk_req.chunk.chunk_id) + return {"req": chunk_req.as_dict(), "state": state.name} + + def get_chunk_reqs(state=None): + out = {} + for chunk_req in self.chunk_store.get_chunk_requests(state): + out[chunk_req.chunk.chunk_id] = make_chunk_req_payload(chunk_req) + return out + + def add_chunk_req(body, state): + if isinstance(body, dict): + self.chunk_store.add_chunk_request(ChunkRequest.from_dict(body), state) + return 1 + elif isinstance(body, list): + for chunk_req in body: + self.chunk_store.add_chunk_request(ChunkRequest.from_dict(chunk_req), state) + return len(body) + + # list all chunk requests + # body json options: + # if state is set in body, then filter by state + @self.app.route("/api/v1/chunk_requests", methods=["GET"]) + def get_chunk_requests(): + state_param = request.args.get("state") + if state_param is not None: + try: + state = ChunkState.from_str(state_param) + except ValueError: + return jsonify({"error": "invalid state"}), 400 + return jsonify({"chunk_requests": get_chunk_reqs(state)}) + else: + return jsonify({"chunk_requests": get_chunk_reqs()}) + + # lookup chunk request given chunk id + @self.app.route("/api/v1/chunk_requests/", methods=["GET"]) + def get_chunk_request(chunk_id: int): + chunk_req = self.chunk_store.get_chunk_request(chunk_id) + if chunk_req: + return jsonify({"chunk_requests": [make_chunk_req_payload(chunk_req)]}) + else: + return jsonify({"error": f"Chunk {chunk_id} not found"}), 404 + + # add a new chunk request with default state registered + @self.app.route("/api/v1/chunk_requests", methods=["POST"]) + def add_chunk_request(): + state_param = request.args.get("state", "registered") + n_added = add_chunk_req(request.json, ChunkState.from_str(state_param)) + return jsonify({"status": "ok", "n_added": n_added}) + + # update chunk request + @self.app.route("/api/v1/chunk_requests/", methods=["PUT"]) + def update_chunk_request(chunk_id: int): + chunk_req = self.chunk_store.get_chunk_request(chunk_id) + if chunk_req is None: + return jsonify({"error": f"Chunk {chunk_id} not found"}), 404 + else: + if "state" in request.args: + try: + state = ChunkState.from_str(request.args.get("state")) + except ValueError: + return jsonify({"error": "invalid state"}), 400 + self.chunk_store.set_chunk_status(chunk_id, state) + return jsonify({"status": "ok"}) + else: + return jsonify({"error": "update not supported"}), 400 diff --git a/skylark/gateway/gateway.py b/skylark/gateway/gateway_reciever.py similarity index 64% rename from skylark/gateway/gateway.py rename to skylark/gateway/gateway_reciever.py index e6405ffd2..f2bca3110 100644 --- a/skylark/gateway/gateway.py +++ b/skylark/gateway/gateway_reciever.py @@ -1,33 +1,34 @@ import hashlib import os +import queue import select import signal import socket from contextlib import closing -from multiprocessing import Process, Value +from multiprocessing import Event, Manager, Process, Value from pathlib import Path from typing import List, Tuple import requests from loguru import logger -from skylark.gateway.chunk_store import ChunkStore +from skylark.gateway.chunk_store import ChunkRequest, ChunkStore from skylark.gateway.wire_protocol_header import WireProtocolHeader -from skylark.utils import Timer +from skylark.utils import PathLike, Timer -class Gateway: - def __init__(self, server_blk_size=4096 * 16): - self.chunk_store = ChunkStore() +class GatewayReciever: + def __init__(self, chunk_store: ChunkStore, server_blk_size=4096 * 16): + self.chunk_store = chunk_store self.server_blk_size = server_blk_size self.server_processes = [] self.server_ports = [] @staticmethod - def checksum_sha256(path: Path) -> str: + def checksum_sha256(path: PathLike) -> str: # todo reading the whole file into memory is not ideal, maybe load chunks or use the linux md5 command # todo standardize paths in skylark to be either str or Path or PathLike - with open(path, "rb") as f: + with Path(open).open("rb") as f: hashstr = hashlib.sha256(f.read()).hexdigest() assert len(hashstr) == 64 return hashstr @@ -39,6 +40,9 @@ def get_free_port(self): return s.getsockname()[1] def start_server(self): + # todo a good place to add backpressure? + started_event = Event() + def server_worker(port): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: sock.bind(("0.0.0.0", port)) @@ -52,6 +56,7 @@ def signal_handler(signal, frame): sock.listen() sock.setblocking(False) + started_event.set() while True: if exit_flag.value == 1: logger.warning(f"[server:{port}] Exiting on signal") @@ -69,6 +74,7 @@ def signal_handler(signal, frame): self.server_processes.append(p) self.server_ports.append(port) p.start() + started_event.wait() logger.info(f"[server] Started server (port = {port})") return port @@ -95,47 +101,6 @@ def stop_servers(self): assert len(self.server_ports) == 0 assert len(self.server_processes) == 0 - def send_chunks(self, chunk_ids: List[int], dst_host="127.0.0.1"): - """Send list of chunks to gateway server, pipelining small chunks together into a single socket stream.""" - # notify server of upcoming ChunkRequests - # pop chunk_req.path[0] to remove self - chunk_reqs = [] - for chunk_id in chunk_ids: - chunk_req = self.chunk_store.get_chunk_request(chunk_id) - chunk_req.path.pop(0) - chunk_reqs.append(chunk_req) - response = requests.post(f"http://{dst_host}:8080/api/v1/chunk_requests", json=[c.as_dict() for c in chunk_reqs]) - assert response.status_code == 200 and response.json() == {"status": "ok"} - - # contact server to set up socket connection - response = requests.post(f"http://{dst_host}:8080/api/v1/servers") - assert response.status_code == 200 - dst_port = int(response.json()["server_port"]) - - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: - sock.connect((dst_host, dst_port)) - for idx, chunk_id in enumerate(chunk_ids): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1) # disable Nagle's algorithm - logger.info(f"[client] Sending chunk {chunk_id} to {dst_host}:{dst_port}") - chunk = self.chunk_store.get_chunk(chunk_id) - self.chunk_store.start_upload(chunk_id) - chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) - header = chunk.to_wire_header(end_of_stream=idx == len(chunk_ids) - 1) - sock.sendall(header.to_bytes()) - with open(chunk_file_path, "rb") as fd: - sock.sendfile(fd) - self.chunk_store.finish_upload(chunk_id) - chunk_file_path.unlink() - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0) # send remaining packets - - # close server - response = requests.delete(f"http://{dst_host}:8080/api/v1/servers/{dst_port}") - assert response.status_code == 200 and response.json() == {"status": "ok"} - - # move chunk_reqs from downloaded to uploaded - for chunk_req in chunk_reqs: - self.chunk_store.mark_chunk_request_uploaded(chunk_req) - def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): logger.info(f"[server] Connection from {addr}") chunks_received = [] diff --git a/skylark/gateway/gateway_sender.py b/skylark/gateway/gateway_sender.py new file mode 100644 index 000000000..b4b5fbe28 --- /dev/null +++ b/skylark/gateway/gateway_sender.py @@ -0,0 +1,125 @@ +import hashlib +import os +import queue +import select +import signal +import socket +from contextlib import closing +from multiprocessing import Event, Manager, Process, Value +from pathlib import Path +import time +from typing import List, Tuple + +import requests +from loguru import logger + +from skylark.gateway.chunk_store import ChunkRequest, ChunkStore +from skylark.gateway.wire_protocol_header import WireProtocolHeader +from skylark.utils import PathLike, Timer + + +class GatewaySender: + def __init__(self, chunk_store: ChunkStore, n_processes=1, batch_size=1): + self.chunk_store = chunk_store + self.n_processes = n_processes + self.batch_size = batch_size + self.processes = [] + + # shared state + self.manager = Manager() + self.next_worker_id = Value("i", 0) + self.worker_queues: queue.Queue[int] = [self.manager.Queue() for _ in range(self.n_processes)] + self.exit_flags = [Event() for _ in range(self.n_processes)] + + def start_workers(self): + for i in range(self.n_processes): + p = Process(target=self.worker_loop, args=(i,)) + p.start() + self.processes.append(p) + + def stop_workers(self): + for i in range(self.n_processes): + self.exit_flags[i].set() + for p in self.processes: + p.join() + self.processes = [] + + def worker_loop(self, id: int): + while not self.exit_flags[id].is_set(): + # get up to pipeline_batch_size chunks from the queue + # todo should we block here to wait for more chunks? + chunks_to_send: List[ChunkRequest] = [] + while len(chunks_to_send) < self.batch_size: + try: + next_chunk_id = self.worker_queues[id].get_nowait() + chunks_to_send.append(self.chunk_store.get_chunk_request(next_chunk_id)) + except queue.Empty: + break + + # check next hop is the same for all chunks in the batch + if chunks_to_send: + next_hop = chunks_to_send[0].path[0] + assert all(next_hop.hop_cloud_region == chunk.path[0].hop_cloud_region for chunk in chunks_to_send) + assert all(next_hop.hop_ip_address == chunk.path[0].hop_ip_address for chunk in chunks_to_send) + + # send chunks + chunk_ids = [req.chunk.chunk_id for req in chunks_to_send] + self.send_chunks(chunk_ids, next_hop.hop_ip_address) + time.sleep(0.1) + + def queue_request(self, chunk_request: ChunkRequest): + # todo go beyond round robin routing? how to handle stragglers or variable-sized objects? + with self.next_worker_id.get_lock(): + worker_id = self.next_worker_id.value + self.worker_queues[worker_id].put(chunk_request.chunk.chunk_id) + self.next_worker_id.value = (worker_id + 1) % self.n_processes + + def send_chunks(self, chunk_ids: List[int], dst_host="127.0.0.1"): + """Send list of chunks to gateway server, pipelining small chunks together into a single socket stream.""" + # notify server of upcoming ChunkRequests + # pop chunk_req.path[0] to remove self + chunk_reqs = [] + for chunk_id in chunk_ids: + chunk_req = self.chunk_store.get_chunk_request(chunk_id) + chunk_req.path.pop(0) + chunk_reqs.append(chunk_req) + response = requests.post(f"http://{dst_host}:8080/api/v1/chunk_requests", json=[c.as_dict() for c in chunk_reqs]) + assert response.status_code == 200 and response.json()["status"] == "ok" + + # contact server to set up socket connection + response = requests.post(f"http://{dst_host}:8080/api/v1/servers") + assert response.status_code == 200 + dst_port = int(response.json()["server_port"]) + + logger.info(f"sending {len(chunk_ids)} chunks to {dst_host}:{dst_port}") + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.connect((dst_host, dst_port)) + for idx, chunk_id in enumerate(chunk_ids): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1) # disable Nagle's algorithm + logger.warning(f"[sender] Sending chunk {chunk_id} to {dst_host}:{dst_port}") + chunk = self.chunk_store.get_chunk_request(chunk_id).chunk + + # send chunk header + self.chunk_store.start_upload(chunk_id) + chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) + header = chunk.to_wire_header(end_of_stream=idx == len(chunk_ids) - 1) + sock.sendall(header.to_bytes()) + logger.debug(f"[sender] Sent chunk header {header}") + + # send chunk data + assert chunk_file_path.exists(), f"chunk file {chunk_file_path} does not exist" + with open(chunk_file_path, "rb") as fd: + bytes_sent = sock.sendfile(fd) + logger.debug(f"[sender] Sent chunk data {bytes_sent} bytes") + + self.chunk_store.finish_upload(chunk_id) + chunk_file_path.unlink() + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0) # send remaining packets + + # close server + response = requests.delete(f"http://{dst_host}:8080/api/v1/servers/{dst_port}") + assert response.status_code == 200 and response.json() == {"status": "ok"} + + # move chunk_reqs from downloaded to uploaded + for chunk_req in chunk_reqs: + self.chunk_store.mark_chunk_request_uploaded(chunk_req) diff --git a/skylark/gateway/wire_protocol_header.py b/skylark/gateway/wire_protocol_header.py index ad0556973..eac3593a1 100644 --- a/skylark/gateway/wire_protocol_header.py +++ b/skylark/gateway/wire_protocol_header.py @@ -1,11 +1,16 @@ import socket from dataclasses import dataclass +from loguru import logger + @dataclass class WireProtocolHeader: """Lightweight wire protocol header for chunk transfers along socket.""" + # todo add protocol version + # todo merge with chunkrequest + chunk_id: int # unsigned long chunk_len: int # unsigned long end_of_stream: bool = False # false by default, but true if this is the last chunk @@ -21,7 +26,7 @@ def length_bytes(): @staticmethod def from_bytes(data: bytes): - assert len(data) == WireProtocolHeader.length_bytes() + assert len(data) == WireProtocolHeader.length_bytes(), f"{len(data)} != {WireProtocolHeader.length_bytes()}" magic = int.from_bytes(data[:8], byteorder="big") if magic != WireProtocolHeader.magic_hex(): raise ValueError("Invalid magic number") @@ -36,12 +41,15 @@ def to_bytes(self): out_bytes += self.chunk_id.to_bytes(8, byteorder="big") out_bytes += self.chunk_len.to_bytes(8, byteorder="big") out_bytes += bytes([int(self.end_of_stream)]) - assert len(out_bytes) == WireProtocolHeader.length_bytes() + assert len(out_bytes) == WireProtocolHeader.length_bytes(), f"{len(out_bytes)} != {WireProtocolHeader.length_bytes()}" return out_bytes @staticmethod def from_socket(sock: socket.socket): - header_bytes = sock.recv(WireProtocolHeader.length_bytes()) + num_bytes = WireProtocolHeader.length_bytes() + logger.info(f"Reading {num_bytes} bytes from socket") + header_bytes = sock.recv(num_bytes) + assert len(header_bytes) == num_bytes, f"{len(header_bytes)} != {num_bytes}" return WireProtocolHeader.from_bytes(header_bytes) def to_socket(self, sock: socket.socket): diff --git a/skylark/replicate/obj_store.py b/skylark/replicate/obj_store.py index e1b943ae7..cba3cafb9 100644 --- a/skylark/replicate/obj_store.py +++ b/skylark/replicate/obj_store.py @@ -7,6 +7,9 @@ from awscrt.http import HttpHeaders, HttpRequest from awscrt.io import ClientBootstrap, DefaultHostResolver, EventLoopGroup from awscrt.s3 import S3Client, S3RequestTlsMode, S3RequestType +import boto3 +import botocore.exceptions + from skylark.compute.aws.aws_server import AWSServer @@ -31,6 +34,10 @@ def upload_object(self, src_file_path, dst_object_name, content_type="infer"): raise NotImplementedError +class NoSuchObjectException(Exception): + pass + + class S3Interface(ObjectStoreInterface): def __init__(self, aws_region, bucket_name, use_tls=True): self.aws_region = aws_region @@ -86,11 +93,21 @@ def delete_objects(self, keys: List[str]): def get_obj_metadata(self, obj_name): s3_client = AWSServer.get_boto3_client("s3", self.aws_region) - return s3_client.head_object(Bucket=self.bucket_name, Key=str(obj_name).lstrip("/")) + try: + return s3_client.head_object(Bucket=self.bucket_name, Key=str(obj_name).lstrip("/")) + except botocore.exceptions.ClientError as e: + raise NoSuchObjectException(f"Object {obj_name} does not exist, or you do not have permission to access it") from e def get_obj_size(self, obj_name): return self.get_obj_metadata(obj_name)["ContentLength"] + def exists(self, obj_name): + try: + self.get_obj_metadata(obj_name) + return True + except NoSuchObjectException: + return False + # todo: implement range request for download def download_object(self, src_object_name, dst_file_path) -> Future: src_object_name, dst_file_path = str(src_object_name), str(dst_file_path) diff --git a/skylark/replicate/replicator_client.py b/skylark/replicate/replicator_client.py index f988ab1b4..88f209b8b 100644 --- a/skylark/replicate/replicator_client.py +++ b/skylark/replicate/replicator_client.py @@ -74,8 +74,13 @@ def start_gateway_instance(self, server: Server): logger.info(f"Deleting old container {container_id}") server.run_command(f"sudo docker rm {container_id}") - gateway_cmd = f"sudo docker run -d --rm --ipc=host --network=host {self.gateway_docker_image} /env/bin/python /pkg/skylark/gateway/gateway_daemon.py" - server.run_command(gateway_cmd) + docker_run_flags = "-d --log-driver=local --ipc=host --network=host" + # todo add other launch flags for gateway daemon + gateway_daemon_cmd = "/env/bin/python /pkg/skylark/gateway/gateway_daemon.py --debug --chunk-dir /dev/shm/skylark/chunks" + docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skylark_gateway {self.gateway_docker_image} {gateway_daemon_cmd}" + start_out, start_err = server.run_command(docker_launch_cmd) + assert not start_err, f"Error starting gateway: {start_err}" + container_id = start_out.strip() # wait for gateways to start (check status API) def is_ready(): @@ -85,7 +90,13 @@ def is_ready(): except Exception as e: return False - wait_for(is_ready, timeout=60, interval=1) + try: + wait_for(is_ready, timeout=10, interval=0.1) + except Exception as e: + logger.error(f"Gateway {server.instance_name} is not ready") + logs, err = server.run_command(f"sudo docker logs skylark_gateway --tail=100") + logger.error(f"Docker logs: {logs}\nerr: {err}") + raise e def kill_gateway_instance(self, server: Server): logger.warning(f"Killing gateway container on {server.instance_name}") @@ -212,7 +223,7 @@ def run_replication_plan(self, job: ReplicationJob): src_path = ChunkRequestHop( hop_cloud_region=src_instance.region_tag, hop_ip_address=src_instance.public_ip, - chunk_location_type="src_object_store", + chunk_location_type="random-128MB", # todo src_object_store src_object_store_region=src_instance.region_tag, src_object_store_bucket=job.source_bucket, ) @@ -247,8 +258,9 @@ def run_replication_plan(self, job: ReplicationJob): # send ChunkRequests to each gateway instance def send_chunk_req(instance: Server, chunk_reqs: List[ChunkRequest]): + logger.debug(f"Sending {len(chunk_reqs)} chunk requests to {instance.public_ip}") body = [c.as_dict() for c in chunk_reqs] - reply = requests.post(f"http://{instance.public_ip}:8080/api/v1/chunk_requests/pending", json=body) + reply = requests.post(f"http://{instance.public_ip}:8080/api/v1/chunk_requests", json=body) if reply.status_code != 200: raise Exception(f"Failed to send chunk requests to gateway instance {instance.instance_name}: {reply.text}") return reply diff --git a/skylark/test/test_gateway_daemon.py b/skylark/test/test_gateway_daemon.py new file mode 100644 index 000000000..54d487f61 --- /dev/null +++ b/skylark/test/test_gateway_daemon.py @@ -0,0 +1,49 @@ +import atexit +from pathlib import Path + +from loguru import logger +from skylark.gateway.chunk_store import Chunk, ChunkRequest, ChunkRequestHop, ChunkState +from skylark.gateway.gateway_daemon import GatewayDaemon +from skylark.replicate.obj_store import S3Interface + + +if __name__ == "__main__": + daemon = GatewayDaemon("/dev/shm/skylark/chunks", debug=True) + + # make obj store interfaces + src_obj_interface = S3Interface("us-east-1", "skylark-us-east-1") + dst_obj_interface = S3Interface("us-west-1", "skylark-us-west-1") + obj = "/test.txt" + + # make random test.txt file and upload it if it doesn't exist + if not src_obj_interface.exists(obj): + logger.info(f"Uploading {obj} to {src_obj_interface.bucket_name}") + test_file = Path("/tmp/test.txt") + test_file.write_text("test") + src_obj_interface.upload_object(test_file, obj).result() + + # make chunk request + file_size_bytes = src_obj_interface.get_obj_size(obj) + chunk = Chunk( + key=obj, + chunk_id=0, + file_offset_bytes=0, + chunk_length_bytes=file_size_bytes, + chunk_hash_sha256=None, + ) + src_path = ChunkRequestHop( + hop_cloud_region="aws:us-east-1", + hop_ip_address="localhost", + chunk_location_type="relay", + src_object_store_region="us-east-1", + src_object_store_bucket="skylark-us-east-1", + ) + req = ChunkRequest(chunk=chunk, path=[src_path]) + logger.debug(f"Chunk request: {req}") + + # send chunk request to gateway + daemon.chunk_store.add_chunk_request(req, ChunkState.ready_to_upload) + assert daemon.chunk_store.get_chunk_request(req.chunk.chunk_id) == req + + # run gateway daemon + daemon.run() diff --git a/skylark/test/test_null_replicator.py b/skylark/test/test_null_replicator.py deleted file mode 100644 index cf97e3666..000000000 --- a/skylark/test/test_null_replicator.py +++ /dev/null @@ -1,23 +0,0 @@ -from skylark.replicate.replication_plan import ReplicationTopology -from skylark.replicate.replicator_client import ReplicatorCoordinator - - -def check_pathset(paths, gcp_project, gateway_docker_image): - topo = ReplicationTopology(paths) - replicator = ReplicatorCoordinator( - topology=topo, - gcp_project=gcp_project, - gateway_docker_image=gateway_docker_image, - ) - replicator.provision_gateways() - - # ensure all instances are running w/ hello world docker image - for path in replicator.bound_paths: - for server in path: - out, err = server.run_command("sudo docker run --rm hello-world") - assert "Hello from Docker!" in out - replicator.deprovision_gateways() - - -def test_direct_path(): - check_pathset([["aws:us-east-1", "aws:us-west-1"]], "skylark-333700", "ghcr.io/parasj/skylark:latest") diff --git a/skylark/replicate/test_replicator_client.py b/skylark/test/test_replicator_client.py similarity index 100% rename from skylark/replicate/test_replicator_client.py rename to skylark/test/test_replicator_client.py From da8f802a45ed80ba9779c7e163efc57171bd221b Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Fri, 31 Dec 2021 09:26:41 +0000 Subject: [PATCH 2/9] seems to work? --- skylark/gateway/chunk_store.py | 26 ++++++++++++++---- skylark/gateway/gateway_daemon.py | 38 ++++++++++++++++++-------- skylark/gateway/gateway_reciever.py | 12 ++++++-- skylark/gateway/gateway_sender.py | 34 +++++++++++------------ skylark/replicate/replicator_client.py | 4 +-- 5 files changed, 75 insertions(+), 39 deletions(-) diff --git a/skylark/gateway/chunk_store.py b/skylark/gateway/chunk_store.py index bc5ac86ad..6f30655c6 100644 --- a/skylark/gateway/chunk_store.py +++ b/skylark/gateway/chunk_store.py @@ -73,7 +73,7 @@ def from_dict(in_dict: Dict): class ChunkState(Enum): registered = auto() download_in_progress = auto() - ready_to_upload = auto() + downloaded = auto() upload_in_progress = auto() upload_complete = auto() failed = auto() @@ -108,30 +108,35 @@ def get_chunk_status(self, chunk_id: int) -> Optional[ChunkState]: return self.chunk_status[chunk_id] if chunk_id in self.chunk_status else None def set_chunk_status(self, chunk_id: int, new_status: ChunkState): + logger.debug(f"Setting chunk {chunk_id} status to {new_status}") self.chunk_status[chunk_id] = new_status def start_download(self, chunk_id: int): - if self.get_chunk_status(chunk_id) == ChunkState.registered: + state = self.get_chunk_status(chunk_id) + if state in [ChunkState.registered, ChunkState.download_in_progress]: self.set_chunk_status(chunk_id, ChunkState.download_in_progress) else: raise ValueError(f"Invalid transition start_download from {self.get_chunk_status(chunk_id)}") def finish_download(self, chunk_id: int, runtime_s: Optional[float] = None): # todo log runtime to statistics store - if self.get_chunk_status(chunk_id) == ChunkState.download_in_progress: - self.set_chunk_status(chunk_id, ChunkState.ready_to_upload) + state = self.get_chunk_status(chunk_id) + if state in [ChunkState.download_in_progress, ChunkState.downloaded]: + self.set_chunk_status(chunk_id, ChunkState.downloaded) else: raise ValueError(f"Invalid transition finish_download from {self.get_chunk_status(chunk_id)}") def start_upload(self, chunk_id: int): - if self.get_chunk_status(chunk_id) == ChunkState.ready_to_upload: + state = self.get_chunk_status(chunk_id) + if state in [ChunkState.downloaded, ChunkState.upload_in_progress]: self.set_chunk_status(chunk_id, ChunkState.upload_in_progress) else: raise ValueError(f"Invalid transition start_upload from {self.get_chunk_status(chunk_id)}") def finish_upload(self, chunk_id: int, runtime_s: Optional[float] = None): # todo log runtime to statistics store - if self.get_chunk_status(chunk_id) == ChunkState.upload_in_progress: + state = self.get_chunk_status(chunk_id) + if state in [ChunkState.upload_in_progress, ChunkState.upload_complete]: self.set_chunk_status(chunk_id, ChunkState.upload_complete) else: raise ValueError(f"Invalid transition finish_upload from {self.get_chunk_status(chunk_id)}") @@ -158,3 +163,12 @@ def add_chunk_request(self, chunk_request: ChunkRequest, state=ChunkState.regist logger.debug(f"Adding chunk request {chunk_request.chunk.chunk_id}") self.set_chunk_status(chunk_request.chunk.chunk_id, state) self.chunk_requests[chunk_request.chunk.chunk_id] = chunk_request + + def pop_chunk_request_path(self, chunk_id: int) -> Optional[ChunkRequestHop]: + if chunk_id in self.chunk_requests: + chunk_request = self.chunk_requests[chunk_id] + if len(chunk_request.path) > 0: + result = chunk_request.path.pop(0) + self.chunk_requests[chunk_id] = chunk_request + return result + return None diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index f499c5cfe..ef423f484 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -57,17 +57,26 @@ def exit_handler(signum, frame): logger.info("Starting daemon loop") while not exit_flag.is_set(): # queue object uploads and relays - for chunk_req in self.chunk_store.get_chunk_requests(ChunkState.ready_to_upload): - logger.info(f"Relaying chunk {chunk_req.chunk.chunk_id}") + for chunk_req in self.chunk_store.get_chunk_requests(ChunkState.downloaded): if len(chunk_req.path) > 0: current_hop = chunk_req.path[0] if current_hop.chunk_location_type == "dst_object_store": - logger.warning(f"NOT IMPLEMENTED: Queuing object store upload for chunk {chunk_req.chunk_id}") - elif current_hop.chunk_location_type == "relay": + logger.warning(f"NOT IMPLEMENTED: Queuing object store upload for chunk {chunk_req.chunk.chunk_id}") + self.chunk_store.fail(chunk_req.chunk.chunk_id) + elif ( + current_hop.chunk_location_type == "src_object_store" + or current_hop.chunk_location_type == "relay" + or current_hop.chunk_location_type.startswith("random_") + ): logger.info(f"Queuing chunk {chunk_req.chunk.chunk_id} for relay") self.gateway_sender.queue_request(chunk_req) self.chunk_store.start_upload(chunk_req.chunk.chunk_id) + elif current_hop.chunk_location_type == "save_local": + # do nothing, done + pass else: + logger.error(f"Unknown chunk location type {current_hop.chunk_location_type}") + self.chunk_store.fail(chunk_req.chunk.chunk_id) raise ValueError(f"Unknown or incorrect chunk_location_type {current_hop.chunk_location_type}") else: logger.error(f"Ready to upload chunk {chunk_req.chunk_id} has no hops") @@ -75,25 +84,32 @@ def exit_handler(signum, frame): # queue object store downloads and relays (if space is available) # todo ensure space is available for chunk_req in self.chunk_store.get_chunk_requests(ChunkState.registered): - logger.info(f"Downloading chunk {chunk_req.chunk.chunk_id}") if len(chunk_req.path) > 0: current_hop = chunk_req.path[0] if current_hop.chunk_location_type == "src_object_store": - logger.warning(f"NOT IMPLEMENTED: Queuing object store download for chunk {chunk_req.chunk_id}") - elif current_hop.chunk_location_type.startswith("random-"): + logger.warning(f"NOT IMPLEMENTED: Queuing object store download for chunk {chunk_req.chunk.chunk_id}") + self.chunk_store.fail(chunk_req.chunk.chunk_id) + elif current_hop.chunk_location_type.startswith("random_"): self.chunk_store.start_download(chunk_req.chunk.chunk_id) - size_mb = re.search(r"random-(\d+)MB", current_hop.chunk_location_type).group(1) + size_mb = re.search(r"random_(\d+)MB", current_hop.chunk_location_type).group(1) logger.info(f"Generating {size_mb}MB random chunk {chunk_req.chunk.chunk_id}") with self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).open("wb") as f: - f.write(os.urandom(int(size_mb) * 1024 * 1024)) + f.write(os.urandom(int(size_mb * 1e6))) + + # update chunk size + chunk_req.chunk.chunk_length_bytes = int(size_mb * 1e6) + self.chunk_store.chunk_requests[chunk_req.chunk.chunk_id] = chunk_req self.chunk_store.finish_download(chunk_req.chunk.chunk_id) + elif current_hop.chunk_location_type == "relay" or current_hop.chunk_location_type == "save_local": + # do nothing, waiting for chunk to be be ready_to_upload + continue else: + logger.error(f"Unknown chunk location type {current_hop.chunk_location_type}") + self.chunk_store.fail(chunk_req.chunk.chunk_id) raise ValueError(f"Unknown or incorrect chunk_location_type {current_hop.chunk_location_type}") else: logger.error(f"Registered chunk {chunk_req.chunk_id} has no hops") - time.sleep(0.5) - if __name__ == "__main__": parser = argparse.ArgumentParser(description="Skylark Gateway Daemon") diff --git a/skylark/gateway/gateway_reciever.py b/skylark/gateway/gateway_reciever.py index f2bca3110..89bd929ba 100644 --- a/skylark/gateway/gateway_reciever.py +++ b/skylark/gateway/gateway_reciever.py @@ -110,17 +110,23 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): self.chunk_store.start_download(chunk_header.chunk_id) with Timer() as t: chunk_data_size = chunk_header.chunk_len + chunk_received_size = 0 chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_header.chunk_id) with chunk_file_path.open("wb") as f: while chunk_data_size > 0: data = conn.recv(min(chunk_data_size, self.server_blk_size)) f.write(data) chunk_data_size -= len(data) + chunk_received_size += len(data) + logger.info(f"[server] Received chunk {chunk_header.chunk_id} ({chunk_received_size} bytes) in {t.elapsed} seconds") # check hash, update status and close socket if transfer is complete - if self.checksum_sha256(chunk_file_path) != chunk_header.chunk_hash_sha256: - raise ValueError(f"Received chunk {chunk_header.chunk_id} with invalid hash") + # todo write checksums upon read from object store + # if self.checksum_sha256(chunk_file_path) != chunk_header.chunk_hash_sha256: + # raise ValueError(f"Received chunk {chunk_header.chunk_id} with invalid hash") self.chunk_store.finish_download(chunk_header.chunk_id, t.elapsed) - self.chunk_store.mark_chunk_request_downloaded(chunk_header.chunk_id) + chunks_received.append(chunk_header.chunk_id) + logger.info(f"[server] Received chunk {chunk_header.chunk_id}") if chunk_header.end_of_stream: + logger.info(f"[server] Received end of stream, returning") conn.close() return chunks_received diff --git a/skylark/gateway/gateway_sender.py b/skylark/gateway/gateway_sender.py index b4b5fbe28..c27a819c1 100644 --- a/skylark/gateway/gateway_sender.py +++ b/skylark/gateway/gateway_sender.py @@ -47,30 +47,34 @@ def stop_workers(self): def worker_loop(self, id: int): while not self.exit_flags[id].is_set(): # get up to pipeline_batch_size chunks from the queue - # todo should we block here to wait for more chunks? - chunks_to_send: List[ChunkRequest] = [] - while len(chunks_to_send) < self.batch_size: + chunk_ids_to_send = [] + while len(chunk_ids_to_send) < self.batch_size: try: - next_chunk_id = self.worker_queues[id].get_nowait() - chunks_to_send.append(self.chunk_store.get_chunk_request(next_chunk_id)) + chunk_ids_to_send.append(self.worker_queues[id].get_nowait()) except queue.Empty: break # check next hop is the same for all chunks in the batch - if chunks_to_send: - next_hop = chunks_to_send[0].path[0] - assert all(next_hop.hop_cloud_region == chunk.path[0].hop_cloud_region for chunk in chunks_to_send) - assert all(next_hop.hop_ip_address == chunk.path[0].hop_ip_address for chunk in chunks_to_send) + if chunk_ids_to_send: + logger.debug(f"worker {id} sending {len(chunk_ids_to_send)} chunks") + chunks = [] + for idx in chunk_ids_to_send: + self.chunk_store.pop_chunk_request_path(idx) + chunks.append(self.chunk_store.get_chunk_request(idx)) + next_hop = chunks[0].path[0] + assert all(next_hop.hop_cloud_region == chunk.path[0].hop_cloud_region for chunk in chunks) + assert all(next_hop.hop_ip_address == chunk.path[0].hop_ip_address for chunk in chunks) # send chunks - chunk_ids = [req.chunk.chunk_id for req in chunks_to_send] + chunk_ids = [req.chunk.chunk_id for req in chunks] self.send_chunks(chunk_ids, next_hop.hop_ip_address) - time.sleep(0.1) + time.sleep(0.1) # short interval to batch requests def queue_request(self, chunk_request: ChunkRequest): # todo go beyond round robin routing? how to handle stragglers or variable-sized objects? with self.next_worker_id.get_lock(): worker_id = self.next_worker_id.value + logger.debug(f"queuing chunk request {chunk_request.chunk.chunk_id} to worker {worker_id}") self.worker_queues[worker_id].put(chunk_request.chunk.chunk_id) self.next_worker_id.value = (worker_id + 1) % self.n_processes @@ -78,11 +82,7 @@ def send_chunks(self, chunk_ids: List[int], dst_host="127.0.0.1"): """Send list of chunks to gateway server, pipelining small chunks together into a single socket stream.""" # notify server of upcoming ChunkRequests # pop chunk_req.path[0] to remove self - chunk_reqs = [] - for chunk_id in chunk_ids: - chunk_req = self.chunk_store.get_chunk_request(chunk_id) - chunk_req.path.pop(0) - chunk_reqs.append(chunk_req) + chunk_reqs = [self.chunk_store.get_chunk_request(chunk_id) for chunk_id in chunk_ids] response = requests.post(f"http://{dst_host}:8080/api/v1/chunk_requests", json=[c.as_dict() for c in chunk_reqs]) assert response.status_code == 200 and response.json()["status"] == "ok" @@ -100,7 +100,6 @@ def send_chunks(self, chunk_ids: List[int], dst_host="127.0.0.1"): chunk = self.chunk_store.get_chunk_request(chunk_id).chunk # send chunk header - self.chunk_store.start_upload(chunk_id) chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) header = chunk.to_wire_header(end_of_stream=idx == len(chunk_ids) - 1) sock.sendall(header.to_bytes()) @@ -109,6 +108,7 @@ def send_chunks(self, chunk_ids: List[int], dst_host="127.0.0.1"): # send chunk data assert chunk_file_path.exists(), f"chunk file {chunk_file_path} does not exist" with open(chunk_file_path, "rb") as fd: + logger.debug(f"[sender] Sending file") bytes_sent = sock.sendfile(fd) logger.debug(f"[sender] Sent chunk data {bytes_sent} bytes") diff --git a/skylark/replicate/replicator_client.py b/skylark/replicate/replicator_client.py index 88f209b8b..72e528f8e 100644 --- a/skylark/replicate/replicator_client.py +++ b/skylark/replicate/replicator_client.py @@ -223,14 +223,14 @@ def run_replication_plan(self, job: ReplicationJob): src_path = ChunkRequestHop( hop_cloud_region=src_instance.region_tag, hop_ip_address=src_instance.public_ip, - chunk_location_type="random-128MB", # todo src_object_store + chunk_location_type="random_128MB", # todo src_object_store src_object_store_region=src_instance.region_tag, src_object_store_bucket=job.source_bucket, ) dst_path = ChunkRequestHop( hop_cloud_region=dst_instance.region_tag, hop_ip_address=dst_instance.public_ip, - chunk_location_type="dst_object_store", + chunk_location_type="save_local", # dst_object_store dst_object_store_region=dst_instance.region_tag, dst_object_store_bucket=job.dest_bucket, ) From b5add18c783907e6252ccb1dcf745c768356ed28 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Fri, 31 Dec 2021 09:36:23 +0000 Subject: [PATCH 3/9] works! --- skylark/gateway/gateway_daemon.py | 2 +- skylark/gateway/gateway_reciever.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index ef423f484..22fc5c3ac 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -91,7 +91,7 @@ def exit_handler(signum, frame): self.chunk_store.fail(chunk_req.chunk.chunk_id) elif current_hop.chunk_location_type.startswith("random_"): self.chunk_store.start_download(chunk_req.chunk.chunk_id) - size_mb = re.search(r"random_(\d+)MB", current_hop.chunk_location_type).group(1) + size_mb = int(re.search(r"random_(\d+)MB", current_hop.chunk_location_type).group(1)) logger.info(f"Generating {size_mb}MB random chunk {chunk_req.chunk.chunk_id}") with self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).open("wb") as f: f.write(os.urandom(int(size_mb * 1e6))) diff --git a/skylark/gateway/gateway_reciever.py b/skylark/gateway/gateway_reciever.py index 89bd929ba..35bc66456 100644 --- a/skylark/gateway/gateway_reciever.py +++ b/skylark/gateway/gateway_reciever.py @@ -118,14 +118,13 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): f.write(data) chunk_data_size -= len(data) chunk_received_size += len(data) - logger.info(f"[server] Received chunk {chunk_header.chunk_id} ({chunk_received_size} bytes) in {t.elapsed} seconds") # check hash, update status and close socket if transfer is complete # todo write checksums upon read from object store # if self.checksum_sha256(chunk_file_path) != chunk_header.chunk_hash_sha256: # raise ValueError(f"Received chunk {chunk_header.chunk_id} with invalid hash") self.chunk_store.finish_download(chunk_header.chunk_id, t.elapsed) chunks_received.append(chunk_header.chunk_id) - logger.info(f"[server] Received chunk {chunk_header.chunk_id}") + logger.info(f"[server] Received chunk {chunk_header.chunk_id} ({chunk_received_size} bytes) in {t.elapsed:.2f}s") if chunk_header.end_of_stream: logger.info(f"[server] Received end of stream, returning") conn.close() From e4a5e5c338ea9d37fc272aa3bb54945ab032175d Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Fri, 31 Dec 2021 09:47:10 +0000 Subject: [PATCH 4/9] Fix bug with server shutdown --- skylark/gateway/gateway_reciever.py | 4 +-- skylark/gateway/gateway_sender.py | 2 +- skylark/replicate/replicator_client.py | 7 ++-- skylark/test/test_replicator_client.py | 44 ++++++++++++++------------ 4 files changed, 31 insertions(+), 26 deletions(-) diff --git a/skylark/gateway/gateway_reciever.py b/skylark/gateway/gateway_reciever.py index 35bc66456..bad065729 100644 --- a/skylark/gateway/gateway_reciever.py +++ b/skylark/gateway/gateway_reciever.py @@ -81,7 +81,7 @@ def signal_handler(signal, frame): def stop_server(self, port: int): matched_process = None for server_port, server_process in zip(self.server_ports, self.server_processes): - if port == port: + if server_port == port: matched_process = server_process break if matched_process is None: @@ -124,7 +124,7 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): # raise ValueError(f"Received chunk {chunk_header.chunk_id} with invalid hash") self.chunk_store.finish_download(chunk_header.chunk_id, t.elapsed) chunks_received.append(chunk_header.chunk_id) - logger.info(f"[server] Received chunk {chunk_header.chunk_id} ({chunk_received_size} bytes) in {t.elapsed:.2f}s") + logger.info(f"[server] Received chunk {chunk_header.chunk_id} ({chunk_received_size} bytes) in {t.elapsed:.2f} seconds") if chunk_header.end_of_stream: logger.info(f"[server] Received end of stream, returning") conn.close() diff --git a/skylark/gateway/gateway_sender.py b/skylark/gateway/gateway_sender.py index c27a819c1..972952707 100644 --- a/skylark/gateway/gateway_sender.py +++ b/skylark/gateway/gateway_sender.py @@ -118,7 +118,7 @@ def send_chunks(self, chunk_ids: List[int], dst_host="127.0.0.1"): # close server response = requests.delete(f"http://{dst_host}:8080/api/v1/servers/{dst_port}") - assert response.status_code == 200 and response.json() == {"status": "ok"} + assert response.status_code == 200 and response.json() == {"status": "ok"}, response.json() # move chunk_reqs from downloaded to uploaded for chunk_req in chunk_reqs: diff --git a/skylark/replicate/replicator_client.py b/skylark/replicate/replicator_client.py index 72e528f8e..f1fa7677b 100644 --- a/skylark/replicate/replicator_client.py +++ b/skylark/replicate/replicator_client.py @@ -76,7 +76,9 @@ def start_gateway_instance(self, server: Server): docker_run_flags = "-d --log-driver=local --ipc=host --network=host" # todo add other launch flags for gateway daemon - gateway_daemon_cmd = "/env/bin/python /pkg/skylark/gateway/gateway_daemon.py --debug --chunk-dir /dev/shm/skylark/chunks" + gateway_daemon_cmd = ( + "/env/bin/python /pkg/skylark/gateway/gateway_daemon.py --debug --chunk-dir /dev/shm/skylark/chunks --outgoing-connections 8" + ) docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skylark_gateway {self.gateway_docker_image} {gateway_daemon_cmd}" start_out, start_err = server.run_command(docker_launch_cmd) assert not start_err, f"Error starting gateway: {start_err}" @@ -212,7 +214,8 @@ def run_replication_plan(self, job: ReplicationJob): for idx, obj in enumerate(job.objs): # todo support multipart files # todo support multiple paths - file_size_bytes = src_obj_interface.get_obj_size(obj) + # file_size_bytes = src_obj_interface.get_obj_size(obj) + file_size_bytes = -1 chunk = Chunk( key=obj, chunk_id=idx, diff --git a/skylark/test/test_replicator_client.py b/skylark/test/test_replicator_client.py index 84443c0d4..fb895916b 100644 --- a/skylark/test/test_replicator_client.py +++ b/skylark/test/test_replicator_client.py @@ -39,27 +39,29 @@ def main(args): s3_interface_dst.create_bucket() if not args.skip_upload: - matching_src_keys = list(s3_interface_src.list_objects(prefix=args.key_prefix)) - matching_dst_keys = list(s3_interface_dst.list_objects(prefix=args.key_prefix)) - if matching_src_keys: - logger.warning(f"Deleting objects from source bucket: {matching_src_keys}") - s3_interface_src.delete_objects(matching_src_keys) - if matching_dst_keys: - logger.warning(f"Deleting objects from destination bucket: {matching_dst_keys}") - s3_interface_dst.delete_objects(matching_dst_keys) - - # create test objects w/ random data - logger.info("Creating test objects") - obj_keys = [] - futures = [] - with tempfile.NamedTemporaryFile() as f: - f.write(os.urandom(int(1e6 * args.chunk_size_mb))) - f.seek(0) - for i in trange(args.n_chunks): - k = f"{args.key_prefix}/{i}" - futures.append(s3_interface_src.upload_object(f.name, k)) - obj_keys.append(k) - concurrent.futures.wait(futures) + # todo implement object store support + pass + # matching_src_keys = list(s3_interface_src.list_objects(prefix=args.key_prefix)) + # matching_dst_keys = list(s3_interface_dst.list_objects(prefix=args.key_prefix)) + # if matching_src_keys: + # logger.warning(f"Deleting objects from source bucket: {matching_src_keys}") + # s3_interface_src.delete_objects(matching_src_keys) + # if matching_dst_keys: + # logger.warning(f"Deleting objects from destination bucket: {matching_dst_keys}") + # s3_interface_dst.delete_objects(matching_dst_keys) + + # # create test objects w/ random data + # logger.info("Creating test objects") + # obj_keys = [] + # futures = [] + # with tempfile.NamedTemporaryFile() as f: + # f.write(os.urandom(int(1e6 * args.chunk_size_mb))) + # f.seek(0) + # for i in trange(args.n_chunks): + # k = f"{args.key_prefix}/{i}" + # futures.append(s3_interface_src.upload_object(f.name, k)) + # obj_keys.append(k) + # concurrent.futures.wait(futures) else: obj_keys = [f"{args.key_prefix}/{i}" for i in range(args.n_chunks)] From b6aec3630a0c39102d61183e4d28e5f7f0b03e2e Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Fri, 31 Dec 2021 10:19:10 +0000 Subject: [PATCH 5/9] chasing down race conditions --- skylark/gateway/gateway_daemon.py | 21 ++++++++++++--------- skylark/gateway/gateway_reciever.py | 17 +++++++++++++---- skylark/gateway/gateway_sender.py | 4 ---- skylark/test/test_replicator_client.py | 8 ++++---- 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index 22fc5c3ac..cf224bb17 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -7,6 +7,7 @@ from os import PathLike from pathlib import Path import sys +import threading import time from typing import Optional @@ -24,7 +25,6 @@ def __init__(self, chunk_dir: PathLike, debug=False, log_dir: Optional[PathLike] log_dir = Path(log_dir) log_dir.mkdir(exist_ok=True) logger.add(log_dir / "gateway_daemon.log", rotation="10 MB") - logger.add(sys.stderr, level="DEBUG" if debug else "INFO") self.chunk_store = ChunkStore(chunk_dir) self.gateway_reciever = GatewayReciever(chunk_store=self.chunk_store) self.gateway_sender = GatewaySender(chunk_store=self.chunk_store, n_processes=outgoing_connections, batch_size=outgoing_batch_size) @@ -92,14 +92,17 @@ def exit_handler(signum, frame): elif current_hop.chunk_location_type.startswith("random_"): self.chunk_store.start_download(chunk_req.chunk.chunk_id) size_mb = int(re.search(r"random_(\d+)MB", current_hop.chunk_location_type).group(1)) - logger.info(f"Generating {size_mb}MB random chunk {chunk_req.chunk.chunk_id}") - with self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).open("wb") as f: - f.write(os.urandom(int(size_mb * 1e6))) - - # update chunk size - chunk_req.chunk.chunk_length_bytes = int(size_mb * 1e6) - self.chunk_store.chunk_requests[chunk_req.chunk.chunk_id] = chunk_req - self.chunk_store.finish_download(chunk_req.chunk.chunk_id) + + def fn(chunk_req, size_mb): + fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute()) + os.system(f"dd if=/dev/zero of={fpath} bs=1M count={size_mb}") + + # update chunk size + chunk_req.chunk.chunk_length_bytes = int(size_mb * 1e6) + self.chunk_store.chunk_requests[chunk_req.chunk.chunk_id] = chunk_req + self.chunk_store.finish_download(chunk_req.chunk.chunk_id) + + threading.Thread(target=fn, args=(chunk_req, size_mb)).start() elif current_hop.chunk_location_type == "relay" or current_hop.chunk_location_type == "save_local": # do nothing, waiting for chunk to be be ready_to_upload continue diff --git a/skylark/gateway/gateway_reciever.py b/skylark/gateway/gateway_reciever.py index bad065729..a0d426eac 100644 --- a/skylark/gateway/gateway_reciever.py +++ b/skylark/gateway/gateway_reciever.py @@ -21,8 +21,12 @@ class GatewayReciever: def __init__(self, chunk_store: ChunkStore, server_blk_size=4096 * 16): self.chunk_store = chunk_store self.server_blk_size = server_blk_size + + # shared state + self.manager = Manager() self.server_processes = [] self.server_ports = [] + self.held_ports = set() @staticmethod def checksum_sha256(path: PathLike) -> str: @@ -34,10 +38,14 @@ def checksum_sha256(path: PathLike) -> str: return hashstr def get_free_port(self): - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - s.bind(("", 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - return s.getsockname()[1] + while True: + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(("", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + port = s.getsockname()[1] + if port not in self.held_ports: + self.held_ports.add(port) + return port def start_server(self): # todo a good place to add backpressure? @@ -92,6 +100,7 @@ def stop_server(self, port: int): matched_process.terminate() self.server_processes.remove(matched_process) self.server_ports.remove(port) + self.held_ports.remove(port) logger.warning(f"[server] Stopped server (port = {port})") return port diff --git a/skylark/gateway/gateway_sender.py b/skylark/gateway/gateway_sender.py index 972952707..e8426118f 100644 --- a/skylark/gateway/gateway_sender.py +++ b/skylark/gateway/gateway_sender.py @@ -119,7 +119,3 @@ def send_chunks(self, chunk_ids: List[int], dst_host="127.0.0.1"): # close server response = requests.delete(f"http://{dst_host}:8080/api/v1/servers/{dst_port}") assert response.status_code == 200 and response.json() == {"status": "ok"}, response.json() - - # move chunk_reqs from downloaded to uploaded - for chunk_req in chunk_reqs: - self.chunk_store.mark_chunk_request_uploaded(chunk_req) diff --git a/skylark/test/test_replicator_client.py b/skylark/test/test_replicator_client.py index fb895916b..a76c6a223 100644 --- a/skylark/test/test_replicator_client.py +++ b/skylark/test/test_replicator_client.py @@ -103,11 +103,11 @@ def exit_handler(): # monitor the replication job until it is complete with tqdm(total=args.n_chunks * args.chunk_size_mb, unit="MB", desc="Replication progress") as pbar: while True: - dst_objs = list(s3_interface_dst.list_objects(prefix=args.key_prefix)) - pbar.update(len(dst_objs) * args.chunk_size_mb - pbar.n) - if len(dst_objs) == args.n_chunks: + dst_copied = int(rc.bound_paths[0][1].run_command(f"du -s /dev/shm/skylark/chunks")[0]) / 1e6 + pbar.update(dst_copied - pbar.n) + if dst_copied == args.n_chunks * args.chunk_size_mb: break - time.sleep(0.5) + time.sleep(0.25) # deprovision the gateway instances logger.info("Deprovisioning gateway instances") From 9559aa0776bb941217fdf01f477a89a62b7a894d Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Fri, 31 Dec 2021 10:19:53 +0000 Subject: [PATCH 6/9] update --- skylark/test/test_replicator_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skylark/test/test_replicator_client.py b/skylark/test/test_replicator_client.py index a76c6a223..0c6e6c1d2 100644 --- a/skylark/test/test_replicator_client.py +++ b/skylark/test/test_replicator_client.py @@ -103,7 +103,7 @@ def exit_handler(): # monitor the replication job until it is complete with tqdm(total=args.n_chunks * args.chunk_size_mb, unit="MB", desc="Replication progress") as pbar: while True: - dst_copied = int(rc.bound_paths[0][1].run_command(f"du -s /dev/shm/skylark/chunks")[0]) / 1e6 + dst_copied = int(rc.bound_paths[0][1].run_command(f"cd /dev/shm/skylark/chunks && du -s")[0]) / 1e6 pbar.update(dst_copied - pbar.n) if dst_copied == args.n_chunks * args.chunk_size_mb: break From 9aa520eefd420b5f7b8e70f214616a80eda5f232 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Sat, 1 Jan 2022 01:11:18 +0000 Subject: [PATCH 7/9] Fix bug with incorrect length of random files --- environment.yml | 1 + setup.py | 3 +- skylark/__init__.py | 15 +++++++++ skylark/gateway/chunk_store.py | 1 - skylark/gateway/gateway_daemon.py | 13 ++++--- skylark/gateway/gateway_daemon_api.py | 22 ++---------- skylark/gateway/gateway_reciever.py | 45 +++++++++++-------------- skylark/gateway/gateway_sender.py | 15 +++------ skylark/gateway/wire_protocol_header.py | 1 - skylark/replicate/replicator_client.py | 18 ++++++---- skylark/test/test_replicator_client.py | 33 +++++++++--------- 11 files changed, 83 insertions(+), 84 deletions(-) diff --git a/environment.yml b/environment.yml index a17ba21f0..8e08c00fd 100644 --- a/environment.yml +++ b/environment.yml @@ -23,5 +23,6 @@ dependencies: - paramiko - questionary - ray + - setproctitle - tqdm - werkzeug \ No newline at end of file diff --git a/setup.py b/setup.py index 7fb0bab6c..4f11199b5 100644 --- a/setup.py +++ b/setup.py @@ -18,11 +18,12 @@ "graphviz", "loguru", "matplotlib", - "ray", "numpy", "pandas", "paramiko", "questionary", + "ray", + "setproctitle", "tqdm", "werkzeug", ], diff --git a/skylark/__init__.py b/skylark/__init__.py index 6f8587d24..532ba46c6 100644 --- a/skylark/__init__.py +++ b/skylark/__init__.py @@ -1,6 +1,21 @@ +import os +import sys from pathlib import Path skylark_root = Path(__file__).parent.parent data_root = skylark_root / "data" instance_log_root = skylark_root / "data" / "logs" key_root = skylark_root / "data" / "keys" + + +def print_header(): + header = """================================================= + ______ _ _ _ + / _____)| | | | | | +( (____ | | _ _ _ | | _____ ____ | | _ + \____ \ | |_/ )| | | || | (____ | / ___)| |_/ ) + _____) )| _ ( | |_| || | / ___ || | | _ ( +(______/ |_| \_) \__ | \_)\_____||_| |_| \_) + (____/ +=================================================""" + print(header, flush=True) diff --git a/skylark/gateway/chunk_store.py b/skylark/gateway/chunk_store.py index 6f30655c6..7a128cdab 100644 --- a/skylark/gateway/chunk_store.py +++ b/skylark/gateway/chunk_store.py @@ -108,7 +108,6 @@ def get_chunk_status(self, chunk_id: int) -> Optional[ChunkState]: return self.chunk_status[chunk_id] if chunk_id in self.chunk_status else None def set_chunk_status(self, chunk_id: int, new_status: ChunkState): - logger.debug(f"Setting chunk {chunk_id} status to {new_status}") self.chunk_status[chunk_id] = new_status def start_download(self, chunk_id: int): diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index cf224bb17..963f354c1 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -13,10 +13,12 @@ from loguru import logger from skylark.gateway.chunk_store import ChunkRequest, ChunkState, ChunkStore +import setproctitle from skylark.gateway.gateway_reciever import GatewayReciever from skylark.gateway.gateway_daemon_api import GatewayDaemonAPI from skylark.gateway.gateway_sender import GatewaySender +from skylark import print_header class GatewayDaemon: @@ -24,7 +26,9 @@ def __init__(self, chunk_dir: PathLike, debug=False, log_dir: Optional[PathLike] if log_dir is not None: log_dir = Path(log_dir) log_dir.mkdir(exist_ok=True) - logger.add(log_dir / "gateway_daemon.log", rotation="10 MB") + logger.remove() + logger.add(log_dir / "gateway_daemon.log", rotation="10 MB", enqueue=True) + logger.add(sys.stderr, colorize=True, format="{function:>15}:{line:<3} {level:<8} {message}", level="DEBUG") self.chunk_store = ChunkStore(chunk_dir) self.gateway_reciever = GatewayReciever(chunk_store=self.chunk_store) self.gateway_sender = GatewaySender(chunk_store=self.chunk_store, n_processes=outgoing_connections, batch_size=outgoing_batch_size) @@ -40,6 +44,7 @@ def cleanup(self): self.api_server.shutdown() def run(self): + setproctitle.setproctitle(f"skylark-gateway-daemon") exit_flag = Event() def exit_handler(signum, frame): @@ -96,9 +101,7 @@ def exit_handler(signum, frame): def fn(chunk_req, size_mb): fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute()) os.system(f"dd if=/dev/zero of={fpath} bs=1M count={size_mb}") - - # update chunk size - chunk_req.chunk.chunk_length_bytes = int(size_mb * 1e6) + chunk_req.chunk.chunk_length_bytes = os.path.getsize(fpath) self.chunk_store.chunk_requests[chunk_req.chunk.chunk_id] = chunk_req self.chunk_store.finish_download(chunk_req.chunk.chunk_id) @@ -115,12 +118,14 @@ def fn(chunk_req, size_mb): if __name__ == "__main__": + print_header() parser = argparse.ArgumentParser(description="Skylark Gateway Daemon") parser.add_argument("--chunk-dir", type=Path, default="/dev/shm/skylark/chunks", required=True, help="Directory to store chunks") parser.add_argument("--debug", action="store_true", help="Enable debug mode for Flask") parser.add_argument("--log-dir", type=Path, default=Path("/var/log/skylark"), help="Directory to write logs to") parser.add_argument("--outgoing-connections", type=int, default=1, help="Number of outgoing connections to make to the next relay") args = parser.parse_args() + daemon = GatewayDaemon( chunk_dir=args.chunk_dir, debug=args.debug, log_dir=Path(args.log_dir), outgoing_connections=args.outgoing_connections ) diff --git a/skylark/gateway/gateway_daemon_api.py b/skylark/gateway/gateway_daemon_api.py index 18e3d31ea..122da9e4c 100644 --- a/skylark/gateway/gateway_daemon_api.py +++ b/skylark/gateway/gateway_daemon_api.py @@ -4,9 +4,11 @@ from pathlib import Path import threading from typing import List +import subprocess from flask import Flask, jsonify, request, send_from_directory from werkzeug.serving import make_server +import setproctitle from skylark.gateway.chunk_store import ChunkRequest, ChunkState, ChunkStore from skylark.gateway.gateway_reciever import GatewayReciever @@ -53,6 +55,7 @@ def __init__(self, chunk_store: ChunkStore, gateway_reciever: GatewayReciever, h self.server = make_server(host, port, self.app, threaded=True) def run(self): + setproctitle.setproctitle(f"skylark-gateway-daemon-api") self.server.serve_forever() def shutdown(self): @@ -85,25 +88,6 @@ def shutdown(): self.shutdown() return jsonify({"status": "ok"}) - # serve directory listing of /var/logs/skylark on /api/v1/logs - @self.app.route("/api/v1/logs", methods=["GET"]) - def get_logs(): - if self.log_dir: - out = "" - for f in os.listdir("/var/log/skylark"): - out += f"{f}
" - return out - else: - return "No log directory set", 400 - - # serve log file on /api/v1/logs/ - @self.app.route("/api/v1/logs/", methods=["GET"]) - def get_log(path): - if self.log_dir: - return send_from_directory("/var/log/skylark", path, as_attachment=False, mimetype="text/plain") - else: - return "No log directory set", 400 - def register_server_routes(self): # list running gateway servers w/ ports @self.app.route("/api/v1/servers", methods=["GET"]) diff --git a/skylark/gateway/gateway_reciever.py b/skylark/gateway/gateway_reciever.py index a0d426eac..94ad2caed 100644 --- a/skylark/gateway/gateway_reciever.py +++ b/skylark/gateway/gateway_reciever.py @@ -11,6 +11,7 @@ import requests from loguru import logger +import setproctitle from skylark.gateway.chunk_store import ChunkRequest, ChunkStore from skylark.gateway.wire_protocol_header import WireProtocolHeader @@ -26,7 +27,6 @@ def __init__(self, chunk_store: ChunkStore, server_blk_size=4096 * 16): self.manager = Manager() self.server_processes = [] self.server_ports = [] - self.held_ports = set() @staticmethod def checksum_sha256(path: PathLike) -> str: @@ -37,30 +37,23 @@ def checksum_sha256(path: PathLike) -> str: assert len(hashstr) == 64 return hashstr - def get_free_port(self): - while True: - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - s.bind(("", 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - port = s.getsockname()[1] - if port not in self.held_ports: - self.held_ports.add(port) - return port - def start_server(self): # todo a good place to add backpressure? started_event = Event() + port = Value("i", 0) - def server_worker(port): + def server_worker(): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: - sock.bind(("0.0.0.0", port)) - port = sock.getsockname()[1] + sock.bind(("0.0.0.0", 0)) + socket_port = sock.getsockname()[1] + port.value = socket_port exit_flag = Value("i", 0) def signal_handler(signal, frame): exit_flag.value = 1 signal.signal(signal.SIGINT, signal_handler) + setproctitle.setproctitle(f"skylark-gateway-reciever:{socket_port}") sock.listen() sock.setblocking(False) @@ -75,16 +68,15 @@ def signal_handler(signal, frame): conn, addr = sock.accept() chunks_received = self.recv_chunks(conn, addr) conn.close() - logger.info(f"[server] Received {len(chunks_received)} chunks") + logger.debug(f"[reciver] {chunks_received} chunks received") - port = self.get_free_port() - p = Process(target=server_worker, args=(port,)) - self.server_processes.append(p) - self.server_ports.append(port) + p = Process(target=server_worker) p.start() started_event.wait() - logger.info(f"[server] Started server (port = {port})") - return port + self.server_processes.append(p) + self.server_ports.append(port.value) + logger.info(f"[server] Started server (port = {port.value})") + return port.value def stop_server(self, port: int): matched_process = None @@ -100,7 +92,6 @@ def stop_server(self, port: int): matched_process.terminate() self.server_processes.remove(matched_process) self.server_ports.remove(port) - self.held_ports.remove(port) logger.warning(f"[server] Stopped server (port = {port})") return port @@ -111,7 +102,7 @@ def stop_servers(self): assert len(self.server_processes) == 0 def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): - logger.info(f"[server] Connection from {addr}") + server_port = conn.getsockname()[1] chunks_received = [] while True: # receive header and write data to file @@ -127,14 +118,18 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): f.write(data) chunk_data_size -= len(data) chunk_received_size += len(data) + logger.debug( + f"[reciever:{server_port}] {chunk_header.chunk_id} chunk received {chunk_received_size}/{chunk_header.chunk_len}" + ) # check hash, update status and close socket if transfer is complete # todo write checksums upon read from object store # if self.checksum_sha256(chunk_file_path) != chunk_header.chunk_hash_sha256: # raise ValueError(f"Received chunk {chunk_header.chunk_id} with invalid hash") self.chunk_store.finish_download(chunk_header.chunk_id, t.elapsed) chunks_received.append(chunk_header.chunk_id) - logger.info(f"[server] Received chunk {chunk_header.chunk_id} ({chunk_received_size} bytes) in {t.elapsed:.2f} seconds") + logger.info( + f"[reciever:{server_port}] Received chunk {chunk_header.chunk_id} ({chunk_received_size} bytes) in {t.elapsed:.2f} seconds" + ) if chunk_header.end_of_stream: - logger.info(f"[server] Received end of stream, returning") conn.close() return chunks_received diff --git a/skylark/gateway/gateway_sender.py b/skylark/gateway/gateway_sender.py index e8426118f..53325081b 100644 --- a/skylark/gateway/gateway_sender.py +++ b/skylark/gateway/gateway_sender.py @@ -1,17 +1,14 @@ -import hashlib -import os import queue -import select -import signal import socket from contextlib import closing from multiprocessing import Event, Manager, Process, Value -from pathlib import Path +import sys import time from typing import List, Tuple import requests from loguru import logger +import setproctitle from skylark.gateway.chunk_store import ChunkRequest, ChunkStore from skylark.gateway.wire_protocol_header import WireProtocolHeader @@ -45,6 +42,7 @@ def stop_workers(self): self.processes = [] def worker_loop(self, id: int): + setproctitle.setproctitle(f"skylark-gateway-sender:{id}") while not self.exit_flags[id].is_set(): # get up to pipeline_batch_size chunks from the queue chunk_ids_to_send = [] @@ -91,26 +89,23 @@ def send_chunks(self, chunk_ids: List[int], dst_host="127.0.0.1"): assert response.status_code == 200 dst_port = int(response.json()["server_port"]) - logger.info(f"sending {len(chunk_ids)} chunks to {dst_host}:{dst_port}") with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: sock.connect((dst_host, dst_port)) for idx, chunk_id in enumerate(chunk_ids): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1) # disable Nagle's algorithm - logger.warning(f"[sender] Sending chunk {chunk_id} to {dst_host}:{dst_port}") + logger.warning(f"[sender -> {dst_port}] Sending chunk {chunk_id} to {dst_host}:{dst_port}") chunk = self.chunk_store.get_chunk_request(chunk_id).chunk # send chunk header chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_id) header = chunk.to_wire_header(end_of_stream=idx == len(chunk_ids) - 1) sock.sendall(header.to_bytes()) - logger.debug(f"[sender] Sent chunk header {header}") # send chunk data assert chunk_file_path.exists(), f"chunk file {chunk_file_path} does not exist" with open(chunk_file_path, "rb") as fd: - logger.debug(f"[sender] Sending file") bytes_sent = sock.sendfile(fd) - logger.debug(f"[sender] Sent chunk data {bytes_sent} bytes") + logger.debug(f"[sender -> {dst_port}] Sent {bytes_sent} bytes of data") self.chunk_store.finish_upload(chunk_id) chunk_file_path.unlink() diff --git a/skylark/gateway/wire_protocol_header.py b/skylark/gateway/wire_protocol_header.py index eac3593a1..b89c72a05 100644 --- a/skylark/gateway/wire_protocol_header.py +++ b/skylark/gateway/wire_protocol_header.py @@ -47,7 +47,6 @@ def to_bytes(self): @staticmethod def from_socket(sock: socket.socket): num_bytes = WireProtocolHeader.length_bytes() - logger.info(f"Reading {num_bytes} bytes from socket") header_bytes = sock.recv(num_bytes) assert len(header_bytes) == num_bytes, f"{len(header_bytes)} != {num_bytes}" return WireProtocolHeader.from_bytes(header_bytes) diff --git a/skylark/replicate/replicator_client.py b/skylark/replicate/replicator_client.py index f1fa7677b..433f1a5c3 100644 --- a/skylark/replicate/replicator_client.py +++ b/skylark/replicate/replicator_client.py @@ -67,12 +67,18 @@ def start_gateway_instance(self, server: Server): assert "Status: Downloaded newer image" in docker_out or "Status: Image is up to date" in docker_out, docker_out # delete old gateway containers that are not self.gateway_docker_image - out, err = server.run_command("sudo docker ps -a -q") - for container_id in out.splitlines(): - container_image = server.run_command(f"sudo docker inspect --format='{{.Config.Image}}' {container_id}")[0] - if container_image != self.gateway_docker_image: - logger.info(f"Deleting old container {container_id}") - server.run_command(f"sudo docker rm {container_id}") + server.run_command(f"sudo docker kill $(sudo docker ps -q)") + server.run_command(f"sudo docker rm -f $(sudo docker ps -a -q)") + + # launch dozzle log viewer + server.run_command( + f"sudo docker run --name dozzle -d --volume=/var/run/docker.sock:/var/run/docker.sock -p 8888:8080 amir20/dozzle:latest --filter name=skylark_gateway" + ) + + # launch glances web interface to monitor CPU and memory usage + server.run_command( + f"sudo docker run --name glances -d -p 61208-61209:61208-61209 -e GLANCES_OPT='-w' -v /var/run/docker.sock:/var/run/docker.sock:ro --pid host nicolargo/glances:latest-full" + ) docker_run_flags = "-d --log-driver=local --ipc=host --network=host" # todo add other launch flags for gateway daemon diff --git a/skylark/test/test_replicator_client.py b/skylark/test/test_replicator_client.py index 0c6e6c1d2..484008d48 100644 --- a/skylark/test/test_replicator_client.py +++ b/skylark/test/test_replicator_client.py @@ -81,11 +81,10 @@ def main(args): rc.provision_gateways(reuse_instances=True, log_dir=args.log_dir, authorize_ssh_pub_key=args.copy_ssh_key) rc.start_gateways() - def exit_handler(): - logger.warning("Exiting, closing gateways") - rc.kill_gateways() - - atexit.register(exit_handler) + # def exit_handler(): + # logger.warning("Exiting, closing gateways") + # rc.kill_gateways() + # atexit.register(exit_handler) # run the replication job logger.debug(f"Source gateway API endpoint: http://{rc.bound_paths[0][0].public_ip}:8080/api/v1") @@ -100,18 +99,18 @@ def exit_handler(): ) rc.run_replication_plan(job) - # monitor the replication job until it is complete - with tqdm(total=args.n_chunks * args.chunk_size_mb, unit="MB", desc="Replication progress") as pbar: - while True: - dst_copied = int(rc.bound_paths[0][1].run_command(f"cd /dev/shm/skylark/chunks && du -s")[0]) / 1e6 - pbar.update(dst_copied - pbar.n) - if dst_copied == args.n_chunks * args.chunk_size_mb: - break - time.sleep(0.25) - - # deprovision the gateway instances - logger.info("Deprovisioning gateway instances") - rc.deprovision_gateways() + # # monitor the replication job until it is complete + # with tqdm(total=args.n_chunks * args.chunk_size_mb, unit="MB", desc="Replication progress") as pbar: + # while True: + # dst_copied = int(str(rc.bound_paths[0][1].run_command(f"cd /dev/shm/skylark/chunks && du -s")[0]).strip()) / 1e6 + # pbar.update(dst_copied - pbar.n) + # if dst_copied == args.n_chunks * args.chunk_size_mb: + # break + # time.sleep(0.25) + + # # deprovision the gateway instances + # logger.info("Deprovisioning gateway instances") + # rc.deprovision_gateways() if __name__ == "__main__": From 228e097f6bc39f0793df6e9a24a5dd0a14e585d3 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Sat, 1 Jan 2022 01:25:40 +0000 Subject: [PATCH 8/9] format --- skylark/compute/aws/aws_cloud_provider.py | 13 ++++++++++++- skylark/replicate/replicator_client.py | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/skylark/compute/aws/aws_cloud_provider.py b/skylark/compute/aws/aws_cloud_provider.py index c7e92da69..7a80e7c74 100644 --- a/skylark/compute/aws/aws_cloud_provider.py +++ b/skylark/compute/aws/aws_cloud_provider.py @@ -112,7 +112,7 @@ def get_ubuntu_ami_id(region: str) -> str: return image_list[0]["ImageId"] def provision_instance( - self, region: str, instance_class: str, name: Optional[str] = None, ami_id: Optional[str] = None, tags={"skylark": "true"} + self, region: str, instance_class: str, name: Optional[str] = None, ami_id: Optional[str] = None, tags={"skylark": "true"}, ebs_volume_size: int = 128 ) -> AWSServer: assert not region.startswith("aws:"), "Region should be AWS region" if name is None: @@ -121,6 +121,7 @@ def provision_instance( ami_id = self.get_ubuntu_ami_id(region) ec2 = AWSServer.get_boto3_resource("ec2", region) AWSServer.make_keyfile(region) + # set instance storage to 128GB EBS instance = ec2.create_instances( ImageId=ami_id, InstanceType=instance_class, @@ -133,6 +134,16 @@ def provision_instance( "Tags": [{"Key": "Name", "Value": name}] + [{"Key": k, "Value": v} for k, v in tags.items()], } ], + BlockDeviceMappings=[ + { + "DeviceName": "/dev/sda1", + "Ebs": { + "DeleteOnTermination": True, + "VolumeSize": ebs_volume_size, + "VolumeType": "gp2", + }, + } + ], ) server = AWSServer(f"aws:{region}", instance[0].id) server.wait_for_ready() diff --git a/skylark/replicate/replicator_client.py b/skylark/replicate/replicator_client.py index 433f1a5c3..f6648b6ec 100644 --- a/skylark/replicate/replicator_client.py +++ b/skylark/replicate/replicator_client.py @@ -64,7 +64,7 @@ def start_gateway_instance(self, server: Server): assert "Hello from Docker!" in out server.run_command("sudo docker kill $(docker ps -q)") docker_out, docker_err = server.run_command(f"sudo docker pull {self.gateway_docker_image}") - assert "Status: Downloaded newer image" in docker_out or "Status: Image is up to date" in docker_out, docker_out + assert "Status: Downloaded newer image" in docker_out or "Status: Image is up to date" in docker_out, (docker_out, docker_err) # delete old gateway containers that are not self.gateway_docker_image server.run_command(f"sudo docker kill $(sudo docker ps -q)") From e88f9d89d763ecd6e3821f5055be9bf0264f1bd3 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Fri, 31 Dec 2021 17:28:06 -0800 Subject: [PATCH 9/9] Optimize imports --- skylark/__init__.py | 2 -- skylark/gateway/chunk_store.py | 2 +- skylark/gateway/gateway_daemon.py | 15 +++++++-------- skylark/gateway/gateway_daemon_api.py | 9 +++------ skylark/gateway/gateway_reciever.py | 8 +++----- skylark/gateway/gateway_sender.py | 9 +++------ skylark/gateway/wire_protocol_header.py | 2 -- skylark/replicate/obj_store.py | 4 +--- skylark/test/test_gateway_daemon.py | 3 +-- skylark/test/test_replicator_client.py | 6 ------ 10 files changed, 19 insertions(+), 41 deletions(-) diff --git a/skylark/__init__.py b/skylark/__init__.py index 532ba46c6..73c8e0622 100644 --- a/skylark/__init__.py +++ b/skylark/__init__.py @@ -1,5 +1,3 @@ -import os -import sys from pathlib import Path skylark_root = Path(__file__).parent.parent diff --git a/skylark/gateway/chunk_store.py b/skylark/gateway/chunk_store.py index 7a128cdab..ae53d508a 100644 --- a/skylark/gateway/chunk_store.py +++ b/skylark/gateway/chunk_store.py @@ -3,7 +3,7 @@ from multiprocessing import Manager from os import PathLike from pathlib import Path -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional from loguru import logger diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index 963f354c1..bc0a95c45 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -1,24 +1,23 @@ import argparse import atexit -from multiprocessing import Event import os import re import signal -from os import PathLike -from pathlib import Path import sys import threading -import time +from multiprocessing import Event +from os import PathLike +from pathlib import Path from typing import Optional -from loguru import logger -from skylark.gateway.chunk_store import ChunkRequest, ChunkState, ChunkStore import setproctitle +from loguru import logger -from skylark.gateway.gateway_reciever import GatewayReciever +from skylark import print_header +from skylark.gateway.chunk_store import ChunkState, ChunkStore from skylark.gateway.gateway_daemon_api import GatewayDaemonAPI +from skylark.gateway.gateway_reciever import GatewayReciever from skylark.gateway.gateway_sender import GatewaySender -from skylark import print_header class GatewayDaemon: diff --git a/skylark/gateway/gateway_daemon_api.py b/skylark/gateway/gateway_daemon_api.py index 122da9e4c..fd52b534d 100644 --- a/skylark/gateway/gateway_daemon_api.py +++ b/skylark/gateway/gateway_daemon_api.py @@ -1,14 +1,11 @@ import logging import logging.handlers -import os -from pathlib import Path import threading -from typing import List -import subprocess +from pathlib import Path -from flask import Flask, jsonify, request, send_from_directory -from werkzeug.serving import make_server import setproctitle +from flask import Flask, jsonify, request +from werkzeug.serving import make_server from skylark.gateway.chunk_store import ChunkRequest, ChunkState, ChunkStore from skylark.gateway.gateway_reciever import GatewayReciever diff --git a/skylark/gateway/gateway_reciever.py b/skylark/gateway/gateway_reciever.py index 94ad2caed..07fc1d36c 100644 --- a/skylark/gateway/gateway_reciever.py +++ b/skylark/gateway/gateway_reciever.py @@ -1,19 +1,17 @@ import hashlib import os -import queue import select import signal import socket from contextlib import closing from multiprocessing import Event, Manager, Process, Value from pathlib import Path -from typing import List, Tuple +from typing import Tuple -import requests -from loguru import logger import setproctitle +from loguru import logger -from skylark.gateway.chunk_store import ChunkRequest, ChunkStore +from skylark.gateway.chunk_store import ChunkStore from skylark.gateway.wire_protocol_header import WireProtocolHeader from skylark.utils import PathLike, Timer diff --git a/skylark/gateway/gateway_sender.py b/skylark/gateway/gateway_sender.py index 53325081b..bdfc900ae 100644 --- a/skylark/gateway/gateway_sender.py +++ b/skylark/gateway/gateway_sender.py @@ -1,18 +1,15 @@ import queue import socket +import time from contextlib import closing from multiprocessing import Event, Manager, Process, Value -import sys -import time -from typing import List, Tuple +from typing import List import requests -from loguru import logger import setproctitle +from loguru import logger from skylark.gateway.chunk_store import ChunkRequest, ChunkStore -from skylark.gateway.wire_protocol_header import WireProtocolHeader -from skylark.utils import PathLike, Timer class GatewaySender: diff --git a/skylark/gateway/wire_protocol_header.py b/skylark/gateway/wire_protocol_header.py index b89c72a05..f709aeea7 100644 --- a/skylark/gateway/wire_protocol_header.py +++ b/skylark/gateway/wire_protocol_header.py @@ -1,8 +1,6 @@ import socket from dataclasses import dataclass -from loguru import logger - @dataclass class WireProtocolHeader: diff --git a/skylark/replicate/obj_store.py b/skylark/replicate/obj_store.py index cba3cafb9..d198e0c24 100644 --- a/skylark/replicate/obj_store.py +++ b/skylark/replicate/obj_store.py @@ -3,13 +3,11 @@ from concurrent.futures import Future from typing import Iterator, List +import botocore.exceptions from awscrt.auth import AwsCredentialsProvider from awscrt.http import HttpHeaders, HttpRequest from awscrt.io import ClientBootstrap, DefaultHostResolver, EventLoopGroup from awscrt.s3 import S3Client, S3RequestTlsMode, S3RequestType -import boto3 -import botocore.exceptions - from skylark.compute.aws.aws_server import AWSServer diff --git a/skylark/test/test_gateway_daemon.py b/skylark/test/test_gateway_daemon.py index 54d487f61..a5560ed8a 100644 --- a/skylark/test/test_gateway_daemon.py +++ b/skylark/test/test_gateway_daemon.py @@ -1,12 +1,11 @@ -import atexit from pathlib import Path from loguru import logger + from skylark.gateway.chunk_store import Chunk, ChunkRequest, ChunkRequestHop, ChunkState from skylark.gateway.gateway_daemon import GatewayDaemon from skylark.replicate.obj_store import S3Interface - if __name__ == "__main__": daemon = GatewayDaemon("/dev/shm/skylark/chunks", debug=True) diff --git a/skylark/test/test_replicator_client.py b/skylark/test/test_replicator_client.py index 484008d48..f7262994e 100644 --- a/skylark/test/test_replicator_client.py +++ b/skylark/test/test_replicator_client.py @@ -1,12 +1,6 @@ import argparse -import atexit -import concurrent.futures -import os -import tempfile -import time from loguru import logger -from tqdm import tqdm, trange from skylark.replicate.obj_store import S3Interface from skylark.replicate.replication_plan import ReplicationJob, ReplicationTopology