Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API] Performance Logging #669

Merged
merged 13 commits into from
Nov 8, 2022
1 change: 0 additions & 1 deletion scripts/requirements-gateway.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ google-cloud-storage
cachetools
paramiko
rich
typer
# gateway dependencies
flask
lz4
Expand Down
7 changes: 5 additions & 2 deletions skyplane/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from typing import TYPE_CHECKING, Optional

from skyplane.api.usage.client import get_clientid
from skyplane.api.dataplane import Dataplane
from skyplane.api.impl.path import parse_path
from skyplane.api.impl.planner import DirectPlanner
Expand All @@ -25,6 +26,7 @@ def __init__(
transfer_config: Optional[TransferConfig] = None,
log_dir: Optional[str] = None,
):
self.clientid = get_clientid()
self.aws_auth = aws_config.make_auth_provider() if aws_config else None
self.azure_auth = azure_config.make_auth_provider() if azure_config else None
self.gcp_auth = gcp_config.make_auth_provider() if gcp_config else None
Expand All @@ -40,7 +42,7 @@ def __init__(
logger.open_log_file(self.log_dir / "client.log")

self.provisioner = Provisioner(
host_uuid=uuid.UUID(int=uuid.getnode()).hex,
host_uuid=self.clientid,
aws_auth=self.aws_auth,
azure_auth=self.azure_auth,
gcp_auth=self.gcp_auth,
Expand Down Expand Up @@ -73,6 +75,7 @@ def dataplane(
n_vms: int = 1,
num_connections: int = 32,
) -> Dataplane:
# print(self.clientid)
if type == "direct":
planner = DirectPlanner(
src_cloud_provider,
Expand All @@ -84,7 +87,7 @@ def dataplane(
)
topo = planner.plan()
logger.fs.info(f"[SkyplaneClient.direct_dataplane] Topology: {topo.to_json()}")
return Dataplane(topology=topo, provisioner=self.provisioner, transfer_config=self.transfer_config)
return Dataplane(clientid=self.clientid, topology=topo, provisioner=self.provisioner, transfer_config=self.transfer_config)
else:
raise NotImplementedError(f"Dataplane type {type} not implemented")

Expand Down
7 changes: 5 additions & 2 deletions skyplane/api/dataplane.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import os
import threading
from collections import defaultdict
from collections import defaultdict, Counter
from functools import partial

import nacl.secret
Expand Down Expand Up @@ -39,14 +39,17 @@ class Dataplane:

def __init__(
self,
clientid: str,
topology: ReplicationTopology,
provisioner: "Provisioner",
transfer_config: TransferConfig,
):
self.clientid = clientid
self.topology = topology
self.src_region_tag = self.topology.source_region()
self.dst_region_tag = self.topology.sink_region()
self.max_instances = int(len(self.topology.gateway_nodes) / 2)
regions = Counter([node.region.split(":")[1] for node in self.topology.gateway_nodes])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory there shouldn't be collisions between clouds but maybe should we index it as ("aws", "us-east-1") instead?

self.max_instances = int(regions[max(regions, key=regions.get)])
self.provisioner = provisioner
self.transfer_config = transfer_config
self.http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=3))
Expand Down
69 changes: 46 additions & 23 deletions skyplane/api/impl/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ def __init__(self, dataplane, jobs: List["TransferJob"], transfer_config: Transf
self.dataplane = dataplane
self.type_list = set([job.type for job in jobs])
self.recursive_list = set([str(job.recursive) for job in jobs])
self.size = sum([job.calculate_size() for job in jobs])


self.jobs = {job.uuid: job for job in jobs}
self.transfer_config = transfer_config

Expand All @@ -51,7 +50,7 @@ def __str__(self):
def run(self):
src_cloud_provider = self.dataplane.src_region_tag.split(":")[0]
dst_cloud_provider = self.dataplane.dst_region_tag.split(":")[0]

args = {
"cmd": ",".join(self.type_list),
"recursive": ",".join(self.recursive_list),
Expand All @@ -73,51 +72,61 @@ def run(self):
f"[TransferProgressTracker] Job {job.uuid} dispatched with {len(self.job_chunk_requests[job_uuid])} chunk requests"
)
except Exception as e:
UsageClient.log_exception("dispatch job", e, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag,
session_start_timestamp_ms)
UsageClient.log_exception(
"dispatch job", e, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag, session_start_timestamp_ms
)
raise e

# Record only the transfer time
start_time = int(time.time())
try:
try:
self.monitor_transfer()
except exceptions.SkyplaneGatewayException as err:
reformat_err = Exception(err.pretty_print_str())
UsageClient.log_exception("monitor transfer", reformat_err, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag,
session_start_timestamp_ms)
reformat_err = Exception(err.pretty_print_str()[37:])
UsageClient.log_exception(
"monitor transfer",
reformat_err,
args,
self.dataplane.src_region_tag,
self.dataplane.dst_region_tag,
session_start_timestamp_ms,
)
raise err
except Exception as e:
UsageClient.log_exception("monitor transfer", e, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag,
session_start_timestamp_ms)
UsageClient.log_exception(
"monitor transfer", e, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag, session_start_timestamp_ms
)
raise e
end_time = int(time.time())

try:
for job in self.jobs.values():
logger.fs.debug(f"[TransferProgressTracker] Finalizing job {job.uuid}")
job.finalize()
except Exception as e:
UsageClient.log_exception("finalize job", e, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag,
session_start_timestamp_ms)
UsageClient.log_exception(
"finalize job", e, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag, session_start_timestamp_ms
)
raise e

try:
for job in self.jobs.values():
logger.fs.debug(f"[TransferProgressTracker] Verifying job {job.uuid}")
job.verify()
except Exception as e:
UsageClient.log_exception("verify job", e, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag,
session_start_timestamp_ms)
UsageClient.log_exception(
"verify job", e, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag, session_start_timestamp_ms
)
raise e

# transfer successfully completed
transfer_stats = {
"total_runtime_s": end_time - start_time,
"throughput_gbits": self.size / (end_time - start_time),
"throughput_gbits": self.calculate_size() / (end_time - start_time),
}
UsageClient.log_transfer(transfer_stats, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag,
session_start_timestamp_ms)

UsageClient.log_transfer(
transfer_stats, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag, session_start_timestamp_ms
)

@imports.inject("pandas")
def monitor_transfer(pd, self):
Expand Down Expand Up @@ -203,3 +212,17 @@ def query_bytes_remaining(self):
)
logger.fs.debug(f"[TransferProgressTracker] Bytes remaining per job: {bytes_remaining_per_job}")
return sum(bytes_remaining_per_job.values())

def calculate_size(self):
if len(self.job_chunk_requests) == 0:
return 0
bytes_total_per_job = {}
for job_uuid in self.job_complete_chunk_ids.keys():
bytes_total_per_job[job_uuid] = sum(
[
cr.chunk.chunk_length_bytes
for cr in self.job_chunk_requests[job_uuid]
if cr.chunk.chunk_id in self.job_complete_chunk_ids[job_uuid]
]
)
return sum(bytes_total_per_job.values()) / (2**30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

42 changes: 4 additions & 38 deletions skyplane/api/impl/transfer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class TransferJob:
recursive: bool = False
requester_pays: bool = False
uuid: str = field(init=False, default_factory=lambda: str(uuid.uuid4()))
type: str = ""

def __post_init__(self):
provider_src, bucket_src, self.src_prefix = parse_path(self.src_path)
Expand All @@ -56,41 +57,6 @@ def estimate_cost(self):
# TODO
raise NotImplementedError

def calculate_size(self):
# If errors, then return 0 size. Error is raised in _transfer_pair_generator().
if not self.src_iface.bucket_exists():
return 0
if not self.dst_iface.bucket_exists():
return 0

n_objs = 0
size = 0
for obj in self.src_iface.list_objects(self.src_prefix):
if self._pre_filter_fn(obj):
try:
dest_key = self._map_object_key_prefix(self.src_prefix, obj.key, self.dst_prefix, recursive=self.recursive)
except exceptions.MissingObjectException as e:
continue

dest_provider, dest_region = self.dst_iface.region_tag().split(":")
if dest_provider == "aws":
dest_obj = S3Object(dest_provider, self.dst_iface.bucket(), dest_key)
elif dest_provider == "azure":
dest_obj = AzureBlobObject(dest_provider, self.dst_iface.bucket(), dest_key)
elif dest_provider == "gcp":
dest_obj = GCSObject(dest_provider, self.dst_iface.bucket(), dest_key)
else:
return 0

if self._post_filter_fn(obj, dest_obj):
size += obj.size / (1024**3)
n_objs += 1

if n_objs == 0:
return 0

return size

def _transfer_pair_generator(self) -> Generator[Tuple[ObjectStoreObject, ObjectStoreObject], None, None]:
"""Query source region and return list of objects to transfer."""
if not self.src_iface.bucket_exists():
Expand Down Expand Up @@ -185,9 +151,9 @@ def _post_filter_fn(cls, src_obj: ObjectStoreObject, dest_obj: ObjectStoreObject
class CopyJob(TransferJob):
transfer_list: list = field(default_factory=list) # transfer list for later verification
multipart_transfer_list: list = field(default_factory=list)
type: str = "copy"

def __post_init__(self):
self.type = "copy"
self.http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=3))
return super().__post_init__()

Expand Down Expand Up @@ -265,9 +231,9 @@ def verify(self):

@dataclass
class SyncJob(CopyJob):

type: str = "sync"

def __post_init__(self):
self.type = "sync"
return super().__post_init__()

@classmethod
Expand Down
32 changes: 0 additions & 32 deletions skyplane/api/test/api_demo_copy.py

This file was deleted.

Loading