Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix strange straggler issues #158

Merged
merged 6 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
File renamed without changes.
4 changes: 3 additions & 1 deletion skylark/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ def to_bytes(self):
@staticmethod
def from_socket(sock: socket.socket):
num_bytes = WireProtocolHeader.length_bytes()
header_bytes = sock.recv(num_bytes)
header_bytes = b""
while len(header_bytes) < num_bytes:
header_bytes += sock.recv(num_bytes - len(header_bytes))
assert len(header_bytes) == num_bytes, f"{len(header_bytes)} != {num_bytes}"
return WireProtocolHeader.from_bytes(header_bytes)

Expand Down
2 changes: 1 addition & 1 deletion skylark/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def replicate_random(
# rc.provision_gateways(reuse_gateways, log_dir="/tmp/log_skylark") for debugging and writing log back to local machine at the dir provided
rc.provision_gateways(reuse_gateways)
for node, gw in rc.bound_nodes.items():
logger.info(f"Provisioned {node}: {gw.gateway_log_viewer_url}")
logger.info(f"Provisioned {node}: {gw.gateway_log_viewer_url}, {gw.gateway_api_url}/incomplete_chunk_requests")

if total_transfer_size_mb % chunk_size_mb != 0:
logger.warning(f"total_transfer_size_mb ({total_transfer_size_mb}) is not a multiple of chunk_size_mb ({chunk_size_mb})")
Expand Down
2 changes: 1 addition & 1 deletion skylark/cli/cli_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def ssh(region: Optional[str] = None):
instance_name: AWSServer = questionary.select("Select an instance", choices=choices).ask()
if instance_name is not None and instance_name in instance_map:
instance = instance_map[instance_name]
proc = subprocess.Popen(split(f"ssh -i {str(instance.local_keyfile)} ubuntu@{instance.public_ip()}"))
proc = subprocess.Popen(split(f"ssh -i {str(instance.local_keyfile)} ec2-user@{instance.public_ip()}"))
proc.wait()
else:
typer.secho(f"No instance selected", fg="red")
Expand Down
6 changes: 3 additions & 3 deletions skylark/cli/cli_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from skylark.compute.aws.aws_cloud_provider import AWSCloudProvider
from skylark.compute.azure.azure_cloud_provider import AzureCloudProvider
from skylark.compute.gcp.gcp_cloud_provider import GCPCloudProvider
from skylark.obj_store.object_store_interface import ObjectStoreObject
from skylark.obj_store.object_store_interface import ObjectStoreInterface, ObjectStoreObject
from skylark.obj_store.s3_interface import S3Interface
from skylark.obj_store.gcs_interface import GCSInterface
from skylark.obj_store.azure_interface import AzureInterface
Expand Down Expand Up @@ -89,7 +89,7 @@ def copy_local_local(src: Path, dst: Path):
copyfile(src, dst)


def copy_local_objstore(object_interface: ObjectStoreObject, src: Path, dst_bucket: str, dst_key: str):
def copy_local_objstore(object_interface: ObjectStoreInterface, src: Path, dst_bucket: str, dst_key: str):
ops: List[concurrent.futures.Future] = []
path_mapping: Dict[concurrent.futures.Future, Path] = {}

Expand All @@ -113,7 +113,7 @@ def _copy(path: Path, dst_key: str, total_size=0.0):
pbar.update(path_mapping[op].stat().st_size)


def copy_objstore_local(object_interface: ObjectStoreObject, src_bucket: str, src_key: str, dst: Path):
def copy_objstore_local(object_interface: ObjectStoreInterface, src_bucket: str, src_key: str, dst: Path):
ops: List[concurrent.futures.Future] = []
obj_mapping: Dict[concurrent.futures.Future, ObjectStoreObject] = {}

Expand Down
85 changes: 38 additions & 47 deletions skylark/compute/aws/aws_cloud_provider.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import random
import time
import uuid
from functools import lru_cache
from typing import List, Optional

import botocore
Expand All @@ -12,6 +9,7 @@
from skylark import skylark_root
from skylark.compute.aws.aws_server import AWSServer
from skylark.compute.cloud_providers import CloudProvider
from skylark.utils.utils import retry_backoff


class AWSCloudProvider(CloudProvider):
Expand Down Expand Up @@ -227,47 +225,40 @@ def provision_instance(
subnets = list(vpc.subnets.all())
assert len(subnets) > 0, "No subnets found"

for i in range(4):
try:
# todo instance-initiated-shutdown-behavior terminate
instance = ec2.create_instances(
ImageId="resolve:ssm:/aws/service/ecs/optimized-ami/amazon-linux-2/recommended/image_id",
InstanceType=instance_class,
MinCount=1,
MaxCount=1,
KeyName=f"skylark-{region}",
TagSpecifications=[
{
"ResourceType": "instance",
"Tags": [{"Key": "Name", "Value": name}] + [{"Key": k, "Value": v} for k, v in tags.items()],
}
],
BlockDeviceMappings=[
{
"DeviceName": "/dev/sda1",
"Ebs": {"DeleteOnTermination": True, "VolumeSize": ebs_volume_size, "VolumeType": "gp2"},
}
],
NetworkInterfaces=[
{
"DeviceIndex": 0,
"Groups": [self.get_security_group(region).group_id],
"SubnetId": subnets[0].id,
"AssociatePublicIpAddress": True,
"DeleteOnTermination": True,
}
],
)
except botocore.exceptions.ClientError as e:
if not "RequestLimitExceeded" in str(e):
logger.error(f"Failed to provision instance (attempt {i}): {e}")
raise e
else:
logger.warning(f"RequestLimitExceeded, retrying ({i})")
time.sleep(random.random() * 1)
continue
instance[0].wait_until_running()
server = AWSServer(f"aws:{region}", instance[0].id)
server.wait_for_ready()
return server
raise Exception("Failed to provision instance")
def start_instance():
# todo instance-initiated-shutdown-behavior terminate
return ec2.create_instances(
ImageId="resolve:ssm:/aws/service/ecs/optimized-ami/amazon-linux-2/recommended/image_id",
InstanceType=instance_class,
MinCount=1,
MaxCount=1,
KeyName=f"skylark-{region}",
TagSpecifications=[
{
"ResourceType": "instance",
"Tags": [{"Key": "Name", "Value": name}] + [{"Key": k, "Value": v} for k, v in tags.items()],
}
],
BlockDeviceMappings=[
{
"DeviceName": "/dev/sda1",
"Ebs": {"DeleteOnTermination": True, "VolumeSize": ebs_volume_size, "VolumeType": "gp2"},
}
],
NetworkInterfaces=[
{
"DeviceIndex": 0,
"Groups": [self.get_security_group(region).group_id],
"SubnetId": subnets[0].id,
"AssociatePublicIpAddress": True,
"DeleteOnTermination": True,
}
],
)

instance = retry_backoff(start_instance, initial_backoff=1)
assert len(instance) == 1, f"Expected 1 instance, got {len(instance)}"
instance[0].wait_until_running()
server = AWSServer(f"aws:{region}", instance[0].id)
server.wait_for_ready()
return server
30 changes: 11 additions & 19 deletions skylark/compute/server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json
import time
import os
import subprocess
import threading
Expand All @@ -10,7 +9,7 @@
import requests
from skylark.utils import logger
from skylark.compute.utils import make_dozzle_command, make_sysctl_tcp_tuning_command
from skylark.utils.utils import PathLike, Timer, wait_for
from skylark.utils.utils import PathLike, Timer, retry_backoff, wait_for

import configparser
import os
Expand Down Expand Up @@ -190,18 +189,10 @@ def install_docker(self):
cmd = "(command -v docker >/dev/null 2>&1 || { rm -rf get-docker.sh; curl -fsSL https://get.docker.com -o get-docker.sh && sudo sh get-docker.sh; }); "
cmd += "{ sudo docker stop $(docker ps -a -q); sudo docker kill $(sudo docker ps -a -q); sudo docker rm -f $(sudo docker ps -a -q); }; "
cmd += f"(docker --version && echo 'Success, Docker installed' || echo 'Failed to install Docker'); "
for i in range(4):
out, err = self.run_command(cmd)
docker_version = out.strip().split("\n")[-1]
if not docker_version.startswith("Success"): # retry since docker install fails sometimes
logger.error(f"Docker install failed, retrying! (attempt {i}): {out} {err}")
out, err = self.run_command(cmd)
docker_version = out.strip().split("\n")[-1]
else:
if not docker_version.startswith("Success"):
raise Exception(f"Failed to install Docker on {self.region_tag}, {self.public_ip()}: OUT {out}\nERR {err}")
else:
return
out, err = self.run_command(cmd)
docker_version = out.strip().split("\n")[-1]
if not docker_version.startswith("Success"):
raise RuntimeError(f"Failed to install Docker on {self.region_tag}, {self.public_ip()}: OUT {out}\nERR {err}")

def start_gateway(
self,
Expand All @@ -211,7 +202,6 @@ def start_gateway(
use_bbr=False,
):
self.wait_for_ready()
time.sleep(2)

def check_stderr(tup):
assert tup[1].strip() == "", f"Command failed, err: {tup[1]}"
Expand All @@ -220,7 +210,7 @@ def check_stderr(tup):

# increase TCP connections, enable BBR optionally and raise file limits
check_stderr(self.run_command(make_sysctl_tcp_tuning_command(cc="bbr" if use_bbr else "cubic")))
self.install_docker()
retry_backoff(self.install_docker, exception_class=RuntimeError)
self.run_command(make_dozzle_command(log_viewer_port))

# read AWS config file to get credentials
Expand All @@ -240,8 +230,11 @@ def check_stderr(tup):
docker_out, docker_err = self.run_command(f"sudo docker pull {gateway_docker_image}")
assert "Status: Downloaded newer image" in docker_out or "Status: Image is up to date" in docker_out, (docker_out, docker_err)
logger.debug(f"{desc_prefix}: Starting gateway container")
docker_run_flags = f"-d --rm --log-driver=local --ipc=host --network=host --ulimit nofile={1024 * 1024} {docker_envs}"
gateway_daemon_cmd = f"python -u /pkg/skylark/gateway/gateway_daemon.py --chunk-dir /dev/shm/skylark/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag}"
docker_run_flags = (
f"-d --rm --log-driver=local --log-opt max-file=16 --ipc=host --network=host --ulimit nofile={1024 * 1024} {docker_envs}"
)
docker_run_flags += " --mount type=tmpfs,dst=/skylark,tmpfs-size=$(($(free -b | head -n2 | tail -n1 | awk '{print $2}')/2))"
gateway_daemon_cmd = f"python -u /pkg/skylark/gateway/gateway_daemon.py --chunk-dir /skylark/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag}"
docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skylark_gateway {gateway_docker_image} {gateway_daemon_cmd}"
start_out, start_err = self.run_command(docker_launch_cmd)
logger.debug(desc_prefix + f": Gateway started {start_out.strip()}")
Expand All @@ -251,7 +244,6 @@ def check_stderr(tup):
gateway_container_hash = start_out.strip().split("\n")[-1][:12]
self.gateway_api_url = f"http://{self.public_ip()}:8080/api/v1"
self.gateway_log_viewer_url = f"http://{self.public_ip()}:8888/container/{gateway_container_hash}"
self.gateway_htop_url = f"http://{self.public_ip()}:8889"

# wait for gateways to start (check status API)
def is_ready():
Expand Down
18 changes: 6 additions & 12 deletions skylark/gateway/chunk_store.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
from datetime import datetime
from multiprocessing import Manager
from os import PathLike
from pathlib import Path
import subprocess
from typing import Dict, List, Optional

from skylark.utils import logger
Expand All @@ -19,9 +19,6 @@ def __init__(self, chunk_dir: PathLike):
logger.warning(f"Deleting existing chunk file {chunk_file}")
chunk_file.unlink()

self.chunk_store_max_size = os.statvfs(self.chunk_dir).f_frsize * os.statvfs(self.chunk_dir).f_bfree
logger.info(f"Chunk directory {self.chunk_dir} can have max size {self.chunk_store_max_size} bytes")

# multiprocess-safe concurrent structures
self.manager = Manager()
self.chunk_requests: Dict[int, ChunkRequest] = self.manager.dict()
Expand Down Expand Up @@ -50,14 +47,14 @@ def set_chunk_state(self, chunk_id: int, new_status: ChunkState, log_metadata: O
def get_chunk_status_log(self) -> List[Dict]:
return list(self.chunk_status_log)

def state_start_download(self, chunk_id: int, receiver_id: Optional[int] = None):
def state_start_download(self, chunk_id: int, receiver_id: Optional[str] = None):
state = self.get_chunk_state(chunk_id)
if state in [ChunkState.registered, ChunkState.download_in_progress]:
self.set_chunk_state(chunk_id, ChunkState.download_in_progress, {"receiver_id": receiver_id})
else:
raise ValueError(f"Invalid transition start_download from {self.get_chunk_state(chunk_id)}")

def state_finish_download(self, chunk_id: int, receiver_id: Optional[int] = None):
def state_finish_download(self, chunk_id: int, receiver_id: Optional[str] = None):
# todo log runtime to statistics store
state = self.get_chunk_state(chunk_id)
if state in [ChunkState.download_in_progress, ChunkState.downloaded]:
Expand All @@ -72,14 +69,14 @@ def state_queue_upload(self, chunk_id: int):
else:
raise ValueError(f"Invalid transition upload_queued from {self.get_chunk_state(chunk_id)}")

def state_start_upload(self, chunk_id: int, sender_id: Optional[int] = None):
def state_start_upload(self, chunk_id: int, sender_id: Optional[str] = None):
state = self.get_chunk_state(chunk_id)
if state in [ChunkState.upload_queued, ChunkState.upload_in_progress]:
self.set_chunk_state(chunk_id, ChunkState.upload_in_progress, {"sender_id": sender_id})
else:
raise ValueError(f"Invalid transition start_upload from {self.get_chunk_state(chunk_id)}")

def state_finish_upload(self, chunk_id: int, sender_id: Optional[int] = None):
def state_finish_upload(self, chunk_id: int, sender_id: Optional[str] = None):
# todo log runtime to statistics store
state = self.get_chunk_state(chunk_id)
if state in [ChunkState.upload_in_progress, ChunkState.upload_complete]:
Expand Down Expand Up @@ -111,8 +108,5 @@ def add_chunk_request(self, chunk_request: ChunkRequest, state=ChunkState.regist
self.set_chunk_state(chunk_request.chunk.chunk_id, state)
self.chunk_requests[chunk_request.chunk.chunk_id] = chunk_request

def used_bytes(self):
return sum(f.stat().st_size for f in self.chunk_dir.glob("*.chunk") if os.path.exists(f))

def remaining_bytes(self):
return self.chunk_store_max_size - self.used_bytes()
return int(subprocess.check_output(["df", "-k", "--output=avail", self.chunk_dir]).decode().strip().split()[-1]) * 1024
Loading