Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Client] Cancel pending chunk requests by closing gateways #70

Merged
merged 1 commit into from
Jan 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions scripts/test_gateway.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
set -x
sudo docker build -t gateway_test .
sudo docker run --rm --ipc=host --network=host --name=skylark_gateway gateway_test /env/bin/python /pkg/skylark/gateway/gateway_daemon.py --chunk-dir /dev/shm/skylark_test/chunks
sudo DOCKER_BUILDKIT=1 docker build -t gateway_test .
sudo docker run --rm --ipc=host --network=host --name=skylark_gateway gateway_test python /pkg/skylark/gateway/gateway_daemon.py --chunk-dir /dev/shm/skylark_test/chunks
6 changes: 4 additions & 2 deletions skylark/gateway/gateway_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ def __init__(self, chunk_dir: PathLike, debug=False, log_dir: Optional[PathLike]
self.gateway_sender = GatewaySender(chunk_store=self.chunk_store, n_processes=outgoing_connections)

# API server
self.api_server = GatewayDaemonAPI(self.chunk_store, self.gateway_receiver, debug=debug, log_dir=log_dir)
self.api_server.start()
atexit.register(self.cleanup)
self.api_server = GatewayDaemonAPI(
self.chunk_store, self.gateway_receiver, debug=debug, log_dir=log_dir, daemon_cleanup_handler=self.cleanup
)
self.api_server.start()
logger.info(f"Gateway daemon API started at {self.api_server.url}")

def cleanup(self):
Expand Down
21 changes: 19 additions & 2 deletions skylark/gateway/gateway_daemon_api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import logging.handlers
import os
import threading
from pathlib import Path

from loguru import logger
from flask import Flask, jsonify, request
from werkzeug.serving import make_server
from werkzeug import serving
Expand All @@ -27,11 +29,21 @@ class GatewayDaemonAPI(threading.Thread):
* GET /api/v1/chunk_status_log - returns list of chunk status log entries
"""

def __init__(self, chunk_store: ChunkStore, gateway_receiver: GatewayReceiver, host="0.0.0.0", port=8080, debug=False, log_dir=None):
def __init__(
self,
chunk_store: ChunkStore,
gateway_receiver: GatewayReceiver,
host="0.0.0.0",
port=8080,
debug=False,
log_dir=None,
daemon_cleanup_handler=None,
):
super().__init__()
self.app = Flask("gateway_metadata_server")
self.chunk_store = chunk_store
self.gateway_receiver = gateway_receiver
self.daemon_cleanup_handler = daemon_cleanup_handler # optional handler to run when daemon is shutting down during cleanup

# load routes
self.register_global_routes()
Expand Down Expand Up @@ -95,8 +107,13 @@ def get_status():
# shutdown route
@self.app.route("/api/v1/shutdown", methods=["POST"])
def shutdown():
logger.warning("Shutting down gateway daemon")
if self.daemon_cleanup_handler is not None:
self.daemon_cleanup_handler()
logger.warning("Shutting down API")
self.shutdown()
return jsonify({"status": "ok"})
logger.error("Shutdown complete. Hard exit.")
os._exit(1)

def register_server_routes(self):
# list running gateway servers w/ ports
Expand Down
25 changes: 19 additions & 6 deletions skylark/replicate/replicator_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import atexit
from contextlib import closing
from datetime import datetime
import itertools
import json
from logging import log
from functools import partial
from re import T
import time
from typing import Dict, List, Optional, Tuple

Expand All @@ -21,10 +17,9 @@
from skylark.compute.gcp.gcp_cloud_provider import GCPCloudProvider
from skylark.compute.server import Server, ServerState
from skylark.chunk import Chunk, ChunkRequest, ChunkRequestHop, ChunkState
from skylark.obj_store.s3_interface import S3Interface
from skylark.replicate.replication_plan import ReplicationJob, ReplicationTopology
from skylark.replicate.replicator_client_dashboard import ReplicatorClientDashboard
from skylark.utils.utils import PathLike, Timer, do_parallel, wait_for
from skylark.utils.utils import PathLike, Timer, do_parallel


class ReplicatorClient:
Expand Down Expand Up @@ -321,6 +316,7 @@ def monitor_transfer(
dash_host="0.0.0.0",
dash_port="8080",
time_limit_seconds: Optional[float] = None,
cancel_pending: bool = True,
) -> Dict:
total_bytes = sum([cr.chunk.chunk_length_bytes for cr in crs])
if serve_web_dashboard:
Expand All @@ -329,6 +325,23 @@ def monitor_transfer(
atexit.register(dash.shutdown)
logger.info(f"Web dashboard running at {dash.dashboard_url}")
last_log = None

# register atexit handler to cancel pending chunk requests (force shutdown gateways)
if cancel_pending:

def shutdown_handler():
def fn(s: Server):
logger.warning(f"Cancelling pending chunk requests to {s.public_ip()}")
try:
requests.post(f"http://{s.public_ip()}:8080/api/v1/shutdown")
except requests.exceptions.ConnectionError as e:
return # ignore connection errors since server may be shutting down

do_parallel(fn, [hop for path in self.bound_paths for hop in path], n=-1)
logger.warning("Cancelled pending chunk requests")

atexit.register(shutdown_handler)

with Timer() as t:
with tqdm(
total=total_bytes * 8, desc="Replication", unit="bit", unit_scale=True, unit_divisor=KB, disable=not show_pbar
Expand Down