Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix inconsistencies between VCPU and R2 merge #857

Merged
merged 223 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
223 commits
Select commit Hold shift + click to select a range
e66f065
Add Broadcast client API interface (#675)
lynnliu030 Nov 29, 2022
8b842c1
add bucket replication script for experiments
sarahwooders Nov 29, 2022
d6a5a9a
fix region logic
sarahwooders Dec 1, 2022
e15d8fb
merge
sarahwooders Dec 1, 2022
dfecc9c
add log copying
sarahwooders Dec 2, 2022
5881d45
update transfer cost grid
lynnliu030 Dec 4, 2022
9f9c4dd
switch batch to recursive
sarahwooders Dec 5, 2022
771f4dd
Merge branch 'broadcast' of github.com:skyplane-project/skyplane into…
sarahwooders Dec 5, 2022
43a9514
change the upload ids
lynnliu030 Dec 6, 2022
0133f32
put
lynnliu030 Dec 7, 2022
0da6d43
changes for broadcast multipart
lynnliu030 Dec 8, 2022
59b4176
Mux_and fix (#718)
sarahwooders Dec 8, 2022
99f584d
couple gateawy fixes
sarahwooders Dec 8, 2022
d785084
just return 0
sarahwooders Dec 8, 2022
d6f7065
fix bc verification
lynnliu030 Dec 8, 2022
1093d5f
print per-dst remaining bytes
lynnliu030 Dec 8, 2022
8fb9a72
fix obj store wait time
sarahwooders Dec 8, 2022
689386b
Merge branch 'multipart' of github.com:skyplane-project/skyplane into…
sarahwooders Dec 8, 2022
9c56692
assert completed = file deleted
sarahwooders Dec 8, 2022
b5350d0
minor fix
sarahwooders Dec 8, 2022
b4eb68d
accidentally deleted line
sarahwooders Dec 9, 2022
28ca507
fix issue with terminal operators
sarahwooders Dec 9, 2022
c7e56da
update tput profile & ILP
lynnliu030 Dec 9, 2022
651fb97
update run config
lynnliu030 Dec 9, 2022
3590de7
reduce client parallism
sarahwooders Dec 9, 2022
aa83a89
count num processes
sarahwooders Dec 10, 2022
fb460b6
add gw programs
lynnliu030 Dec 10, 2022
bbbfd30
add back print
sarahwooders Dec 10, 2022
207b8f4
modify gateway program processing on gateway side
sarahwooders Dec 10, 2022
5a798f7
working 5 dest transfer
sarahwooders Dec 11, 2022
c176230
broadcast random
lynnliu030 Dec 11, 2022
b5e3d18
add more regions
sarahwooders Dec 11, 2022
04b84f0
increase queue size
sarahwooders Dec 11, 2022
9befa07
merge
lynnliu030 Dec 12, 2022
eb6772e
change regions
sarahwooders Dec 12, 2022
02882c6
fix process counting
sarahwooders Dec 12, 2022
5f22237
reduce # of connections
lynnliu030 Dec 12, 2022
5a6cfe6
lower # of connections
lynnliu030 Dec 12, 2022
4db2767
fix error
sarahwooders Dec 12, 2022
de19866
Merge branch 'multipart' of github.com:skyplane-project/skyplane into…
sarahwooders Dec 12, 2022
7c43ab2
add banned nodes
sarahwooders Dec 12, 2022
3104803
Filter out specific regions and fix ILP (#723)
sarahwooders Dec 13, 2022
a47c341
merge
lynnliu030 Dec 13, 2022
319756b
remove multiplication
lynnliu030 Dec 13, 2022
b0c6ac8
remove multiplication
lynnliu030 Dec 13, 2022
4a5cdc4
Check gbyte_to_transfer
parasj Dec 13, 2022
2682608
script with aws/gcp/azure
lynnliu030 Dec 13, 2022
cb7b514
update num_vms for iterative ILP
lynnliu030 Dec 13, 2022
3aa336a
Merge remote-tracking branch 'origin/main' into multipart
parasj Dec 13, 2022
ec3d9c5
update aws script
lynnliu030 Dec 13, 2022
4e7554e
reset queue sizes
sarahwooders Dec 13, 2022
0f686c2
merge
sarahwooders Dec 13, 2022
ca4115f
Increase retry pool by default
parasj Dec 13, 2022
d1c956f
update script
lynnliu030 Dec 13, 2022
76f68d8
update script
lynnliu030 Dec 13, 2022
e44d5b3
modify connection num
sarahwooders Dec 23, 2022
34f2fb3
Merge branch 'multipart' of github.com:skyplane-project/skyplane into…
sarahwooders Dec 23, 2022
b7f2956
fixed ips
lynnliu030 Dec 24, 2022
5f68afb
Merge branch 'multipart' of https://github.com/skyplane-project/skypl…
lynnliu030 Dec 24, 2022
ac725ee
merge
sarahwooders Dec 24, 2022
7091c90
add topology plotting during runtime
sarahwooders Dec 29, 2022
7ffb026
update visualize gateway program
sarahwooders Dec 29, 2022
3b4c573
fix partitions
lynnliu030 Dec 29, 2022
6491a99
partially implemented support for reading existing gw program
sarahwooders Dec 30, 2022
a0827ec
Merge branch 'multipart' of github.com:skyplane-project/skyplane into…
sarahwooders Dec 30, 2022
a70b712
change instance types
sarahwooders Dec 31, 2022
1cdf740
add log directory for deprovision
sarahwooders Jan 5, 2023
d4c065a
reduce gateway cp parallism and fix recieve bug
sarahwooders Jan 14, 2023
b46e8dc
fix tracker timer
lynnliu030 Jan 31, 2023
862d36b
tracker output
lynnliu030 Feb 5, 2023
1e920e4
update p2p algorithms
lynnliu030 Feb 6, 2023
f5966b8
fix region issue
sarahwooders Feb 13, 2023
d131b75
merge
sarahwooders Feb 13, 2023
4694265
test broadcast object store in gcp
sarahwooders Feb 25, 2023
8f5c707
merge in changes from main
sarahwooders Feb 25, 2023
7baa9ba
stash
sarahwooders Feb 28, 2023
65f9df8
stash
sarahwooders Feb 28, 2023
c358bcb
map subregions
sarahwooders Mar 2, 2023
a56aafc
add basic obj store interfacing to client, write tests, and fix bucke…
sarahwooders Mar 8, 2023
6484f35
reformat
sarahwooders Mar 8, 2023
63ac676
fix formatting
sarahwooders Mar 8, 2023
369de0d
temporarily give up on azure
sarahwooders Mar 8, 2023
ab97b84
move client test to integration test
sarahwooders Mar 8, 2023
d296033
add new files and use generator
sarahwooders Mar 13, 2023
00c5088
reformat
sarahwooders Mar 13, 2023
16ab757
reformat
sarahwooders Mar 13, 2023
4081a36
fix imports
sarahwooders Mar 13, 2023
c0eb132
fix formatting
sarahwooders Mar 13, 2023
dffee82
add basic obj store interfacing to client, write tests, and fix bucke…
sarahwooders Mar 8, 2023
bcda24b
reformat
sarahwooders Mar 8, 2023
6b583f8
fix formatting
sarahwooders Mar 8, 2023
acd101e
temporarily give up on azure
sarahwooders Mar 8, 2023
10f6cd3
move client test to integration test
sarahwooders Mar 8, 2023
69f5af6
add new files and use generator
sarahwooders Mar 13, 2023
e957d4f
reformat
sarahwooders Mar 13, 2023
afa0ec8
fix imports
sarahwooders Mar 13, 2023
5206e0d
fix formatting
sarahwooders Mar 13, 2023
6b4fb9a
add cost function
sarahwooders Mar 14, 2023
03c100d
reformat and remove variables not needed
lynnliu030 Mar 14, 2023
d161e3d
add cost estimation to client dataplane
sarahwooders Mar 14, 2023
f481516
Merge branch 'main' of github.com:sarahwooders/skyplane
sarahwooders Mar 14, 2023
00d0667
add transfer pairs
sarahwooders Mar 14, 2023
8040dd3
merge
sarahwooders Mar 15, 2023
400214a
add logging for error
sarahwooders Mar 15, 2023
92f5d41
error prints
sarahwooders Mar 15, 2023
4fb9a0f
file size fix?
sarahwooders Mar 15, 2023
5805021
fix error? idk
sarahwooders Mar 15, 2023
b4014d3
Merge branch 'skyplane-project:main' into main
sarahwooders Apr 8, 2023
67e5a39
dataplane
sarahwooders Apr 8, 2023
ad1a491
initial implementation
sarahwooders Apr 10, 2023
e9bc643
add pipeline file
sarahwooders Apr 10, 2023
1a4c225
reformat
sarahwooders Apr 10, 2023
8b80f25
add upload id pipelining for multipart
sarahwooders Apr 11, 2023
d46b2d2
initial TransferJob rework (multicast)
abiswal2001 Apr 11, 2023
e313be2
fix transfer generation, but gateway wont start
sarahwooders Apr 12, 2023
c34277d
add deprovisioning and copy error logs
sarahwooders Apr 12, 2023
643a203
half way through removing chunk req
sarahwooders Apr 12, 2023
9d58956
direct transfer works
sarahwooders Apr 14, 2023
c8f99b6
remove docker script for old gateway
sarahwooders Apr 14, 2023
1c80d39
reformat
sarahwooders Apr 14, 2023
ec19129
fix broadcast important
sarahwooders Apr 16, 2023
3f68bb5
working multicast but broken transfer tracking
sarahwooders Apr 21, 2023
0858e87
add multi dest tracker
sarahwooders Apr 21, 2023
460d78c
reformat/cleanup
sarahwooders Apr 21, 2023
9c12137
scaffold more planners
sarahwooders Apr 21, 2023
f081063
implement verification
sarahwooders Apr 24, 2023
123e799
fix different prefix
sarahwooders Apr 24, 2023
4020e14
cleanup
sarahwooders Apr 24, 2023
bee41b2
try to fix docs
sarahwooders Apr 24, 2023
3934177
remove old imports
sarahwooders Apr 24, 2023
cd7876b
remove pandas
sarahwooders Apr 24, 2023
7cfe700
update poetry
sarahwooders Apr 24, 2023
1bd7110
remove experiment import
sarahwooders Apr 25, 2023
0a18559
fix most tests
sarahwooders Apr 25, 2023
8de0414
reformat
sarahwooders Apr 25, 2023
4c85d3b
merge
sarahwooders Apr 25, 2023
68b3867
Merge branch 'sarahwooders-gateway-program-refactor'
sarahwooders Apr 25, 2023
47d0cda
cleanup
sarahwooders Apr 25, 2023
8b8b718
fixed after merge thank god
sarahwooders Apr 25, 2023
14b5580
reformat
sarahwooders Apr 25, 2023
9256710
reformat and add cost estimate fixes
sarahwooders Apr 25, 2023
1bde576
add back throughput
sarahwooders Apr 26, 2023
dfd104b
more cleanup
sarahwooders Apr 26, 2023
36cc6a7
cleanup
sarahwooders Apr 26, 2023
096fc05
remove dockerfile
sarahwooders Apr 26, 2023
5c99bf6
fix ibm imports
sarahwooders Apr 26, 2023
7418ee4
fix imports
sarahwooders Apr 26, 2023
8b4b25f
more cleanuP
sarahwooders Apr 26, 2023
35c90b0
fix ibm imports and pbar
sarahwooders Apr 26, 2023
d2a0aed
add bar for multipart completion
sarahwooders Apr 26, 2023
cc51688
cleanup and remove ibm test
sarahwooders Apr 26, 2023
4599973
forgot to add operator files
sarahwooders Apr 26, 2023
c793ea8
support CLI
sarahwooders Apr 27, 2023
e66fdba
comment out on-prem
sarahwooders Apr 27, 2023
00caafe
ignore solver for linting
sarahwooders Apr 27, 2023
2dc42de
reformat
sarahwooders Apr 27, 2023
e737d3d
format
sarahwooders Apr 27, 2023
2952746
fix
sarahwooders Apr 27, 2023
82030b6
fix errors
sarahwooders Apr 27, 2023
d0017e1
fix pytype issues
sarahwooders Apr 27, 2023
3238983
fix transfer list bug
sarahwooders Apr 29, 2023
5883e51
Merge branch 'skyplane-project:main' into main
sarahwooders Apr 30, 2023
acd952f
add private ips
sarahwooders Apr 30, 2023
08059aa
merge
sarahwooders Apr 30, 2023
2649fe2
add back region tag check
sarahwooders Apr 30, 2023
bff0155
Merge branch 'main' of github.com:sarahwooders/skyplane
sarahwooders Apr 30, 2023
618dbb4
cleanup
sarahwooders Apr 30, 2023
8a5ef62
remove pop
sarahwooders Apr 30, 2023
c2a2242
fix queue
sarahwooders Apr 30, 2023
07f579d
fix pytype
sarahwooders Apr 30, 2023
167f894
fix errors
sarahwooders Apr 30, 2023
cacdf03
disable ibmcloud for skyplane init
sarahwooders May 2, 2023
8980a21
Merge branch 'skyplane-project:main' into main
sarahwooders May 2, 2023
3076b9a
update poetry
sarahwooders May 2, 2023
f3d448b
Merge branch 'main' of github.com:sarahwooders/skyplane
sarahwooders May 2, 2023
e093a42
reformat
sarahwooders May 2, 2023
df517dd
add integration for pull req
sarahwooders May 2, 2023
772cff5
Add gateway start exception handling
sarahwooders May 2, 2023
d1b3d91
fix planning for single region transfers (same source/destination reg…
sarahwooders May 2, 2023
ea882ec
Merge remote-tracking branch 'upstream/main'
sarahwooders May 2, 2023
82f6266
rm print
sarahwooders May 2, 2023
aefcd10
fix azure private ip
sarahwooders May 2, 2023
6c63718
Merge remote-tracking branch 'upstream/main'
sarahwooders May 2, 2023
36c8fcf
reformat
sarahwooders May 2, 2023
4eaa864
refactor and cleanup client code
sarahwooders May 2, 2023
1ba24cd
Merge remote-tracking branch 'upstream/main'
sarahwooders May 2, 2023
01e9158
reformat
sarahwooders May 3, 2023
a2edb08
merge
sarahwooders May 3, 2023
d41427c
add local tests
sarahwooders May 3, 2023
f29bb76
fix tests
sarahwooders May 3, 2023
1486b85
add s3 interface
sarahwooders May 3, 2023
f3521dc
Refactor CLI transfer code and support local fallback (#829)
sarahwooders May 3, 2023
68eb8d5
Update integration-test-local.yml
sarahwooders May 3, 2023
6e5cfd1
Update integration-test-multiple-sizes.yml
sarahwooders May 3, 2023
bf0cb99
commit
sarahwooders May 3, 2023
5ea262b
add waiting and retry for multipart completion
sarahwooders May 5, 2023
2d4a4cc
reformat
sarahwooders May 5, 2023
269a9cb
Merge remote-tracking branch 'upstream/main'
sarahwooders May 5, 2023
18069fd
Merge branch 'integration-tests' into main
sarahwooders May 5, 2023
9e9bbd1
set multipart flag
sarahwooders May 5, 2023
4643983
Merge branch 'main' of github.com:sarahwooders/skyplane
sarahwooders May 5, 2023
f19fc34
merge
sarahwooders May 9, 2023
0258def
broken
sarahwooders May 10, 2023
11e3eef
worked for 1TB
sarahwooders May 11, 2023
8336835
reformat
sarahwooders May 11, 2023
f0370eb
cleanup
sarahwooders May 11, 2023
e277f4d
reformat
sarahwooders May 11, 2023
38a2de8
poetry
sarahwooders May 11, 2023
ed20f35
cleanup
sarahwooders May 15, 2023
427db15
Merge branch 'main' of github.com:sarahwooders/skyplane
sarahwooders May 31, 2023
894420c
add r2 initial implementation
sarahwooders May 31, 2023
8946abd
broken r2 implementation
sarahwooders Jun 1, 2023
8fbb3be
reformat
sarahwooders Jun 1, 2023
5c335aa
working cloudflare destination
sarahwooders Jun 5, 2023
eb3b49b
working destination transfer
sarahwooders Jun 5, 2023
65be703
cleanup
sarahwooders Jun 5, 2023
b266636
more cleanup
sarahwooders Jun 5, 2023
2217329
fix pytype
sarahwooders Jun 6, 2023
1904c3d
more fixes
sarahwooders Jun 6, 2023
ce77e80
Merge branch 'main' into r2
sarahwooders Jun 6, 2023
01212da
Merge remote-tracking branch 'upstream/main' into r2
sarahwooders Jun 6, 2023
9c35e8b
fix merge errors
sarahwooders Jun 6, 2023
1519aad
fix
sarahwooders Jun 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions skyplane/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ class TransferConfig:
ibmcloud_instance_class: str = "bx2-2x8"
gcp_use_premium_network: bool = True

aws_vcpu_file: Path = aws_quota_path
gcp_vcpu_file: Path = gcp_quota_path
azure_vcpu_file: Path = azure_standardDv5_quota_path
# TODO: add ibmcloud when the quota info is available

# multipart config
multipart_enabled: bool = True
multipart_threshold_mb: int = 128
Expand Down
6 changes: 3 additions & 3 deletions skyplane/api/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ def __init__(
# planner
self.planning_algorithm = planning_algorithm
if self.planning_algorithm == "direct":
self.planner = MulticastDirectPlanner(self.max_instances, 64)
self.planner = MulticastDirectPlanner(self.max_instances, 64, self.transfer_config)
elif self.planning_algorithm == "src_one_sided":
self.planner = DirectPlannerSourceOneSided(self.max_instances, 64)
self.planner = DirectPlannerSourceOneSided(self.max_instances, 64, self.transfer_config)
elif self.planning_algorithm == "dst_one_sided":
self.planner = DirectPlannerDestOneSided(self.max_instances, 64)
self.planner = DirectPlannerDestOneSided(self.max_instances, 64, self.transfer_config)
else:
raise ValueError(f"No such planning algorithm {planning_algorithm}")

Expand Down
190 changes: 139 additions & 51 deletions skyplane/planner/planner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from importlib.resources import path
from typing import Dict, List, Optional, Tuple, Tuple
import re
import os

from skyplane import compute
from skyplane.api.config import TransferConfig
Expand All @@ -21,12 +22,30 @@
import json

from skyplane.utils.fn import do_parallel
from skyplane.config_paths import config_path, azure_standardDv5_quota_path, aws_quota_path, gcp_quota_path
from skyplane.config import SkyplaneConfig


class Planner:
# Only supporting "aws:m5.", "azure:StandardD_v5", and "gcp:n2-standard" instances for now
_VCPUS = (96, 64, 48, 32, 16, 8, 4, 2)

def __init__(self, transfer_config: TransferConfig):
self.transfer_config = transfer_config
self.config = SkyplaneConfig.load_config(config_path)

# Loading the quota information, add ibm cloud when it is supported
self.quota_limits = {}
if os.path.exists(aws_quota_path):
with aws_quota_path.open("r") as f:
self.quota_limits["aws"] = json.load(f)
if os.path.exists(azure_standardDv5_quota_path):
with azure_standardDv5_quota_path.open("r") as f:
self.quota_limits["azure"] = json.load(f)
if os.path.exists(gcp_quota_path):
with gcp_quota_path.open("r") as f:
self.quota_limits["gcp"] = json.load(f)

def plan(self) -> TopologyPlan:
raise NotImplementedError

Expand Down Expand Up @@ -74,10 +93,10 @@ def _vcpus_to_vm(cloud_provider: str, vcpus: int) -> str:

class UnicastDirectPlanner(Planner):
# DO NOT USE THIS - broken for single-region transfers
def __init__(self, n_instances: int, n_connections: int):
def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig):
self.n_instances = n_instances
self.n_connections = n_connections
super().__init__()
super().__init__(transfer_config)

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
# make sure only single destination
Expand Down Expand Up @@ -140,25 +159,10 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan:


class MulticastDirectPlanner(Planner):
n_instances: int
n_connections: int
transfer_config: TransferConfig

def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig):
self.n_instances = n_instances
self.n_connections = n_connections
self.transfer_config = transfer_config

# Loading the quota information, add ibm cloud when it is supported
self.quota_limits = {}
with self.transfer_config.aws_vcpu_file.open("r") as f:
self.quota_limits["aws"] = json.load(f)
with self.transfer_config.gcp_vcpu_file.open("r") as f:
self.quota_limits["gcp"] = json.load(f)
with self.transfer_config.azure_vcpu_file.open("r") as f:
self.quota_limits["azure"] = json.load(f)

super().__init__()
super().__init__(transfer_config)

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
src_region_tag = jobs[0].src_iface.region_tag()
Expand Down Expand Up @@ -329,14 +333,9 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[int, int]]:
return (vcpus, n_instances)


class DirectPlannerSourceOneSided(Planner):
class DirectPlannerSourceOneSided(MulticastDirectPlanner):
"""Planner that only creates VMs in the source region"""

def __init__(self, n_instances: int, n_connections: int):
self.n_instances = n_instances
self.n_connections = n_connections
super().__init__()

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
src_region_tag = jobs[0].src_iface.region_tag()
dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces]
Expand Down Expand Up @@ -391,13 +390,124 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
return plan


class DirectPlannerDestOneSided(Planner):
class DirectPlannerDestOneSided(MulticastDirectPlanner):
"""Planner that only creates instances in the destination region"""

def __init__(self, n_instances: int, n_connections: int):
self.n_instances = n_instances
self.n_connections = n_connections
super().__init__()
def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
# only create in destination region
src_region_tag = jobs[0].src_iface.region_tag()
dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces]
# jobs must have same sources and destinations
for job in jobs[1:]:
assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region"
assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set"

plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags)
# TODO: use VM limits to determine how many instances to create in each region
# TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions
for i in range(self.n_instances):
for dst_region_tag in dst_region_tags:
plan.add_gateway(dst_region_tag)

# initialize gateway programs per region
dst_program = {dst_region: GatewayProgram() for dst_region in dst_region_tags}

# iterate through all jobs
for job in jobs:
src_bucket = job.src_iface.bucket()
src_region_tag = job.src_iface.region_tag()
src_provider = src_region_tag.split(":")[0]

partition_id = jobs.index(job)

# send to all destination
dst_prefixes = job.dst_prefixes
for i in range(len(job.dst_ifaces)):
dst_iface = job.dst_ifaces[i]
dst_prefix = dst_prefixes[i]
dst_region_tag = dst_iface.region_tag()
dst_bucket = dst_iface.bucket()
dst_gateways = plan.get_region_gateways(dst_region_tag)

# source region gateway program
obj_store_read = dst_program[dst_region_tag].add_operator(
GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id
)

dst_program[dst_region_tag].add_operator(
GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix),
parent_handle=obj_store_read,
partition_id=partition_id,
)

# update cost per GB
plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag)

# set gateway programs
for dst_region_tag, program in dst_program.items():
plan.set_gateway_program(dst_region_tag, program)
return plan


class DirectPlannerSourceOneSided(MulticastDirectPlanner):
"""Planner that only creates VMs in the source region"""

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
src_region_tag = jobs[0].src_iface.region_tag()
dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces]
# jobs must have same sources and destinations
for job in jobs[1:]:
assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region"
assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set"

plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags)
# TODO: use VM limits to determine how many instances to create in each region
# TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions
for i in range(self.n_instances):
plan.add_gateway(src_region_tag)

# initialize gateway programs per region
src_program = GatewayProgram()

# iterate through all jobs
for job in jobs:
src_bucket = job.src_iface.bucket()
src_region_tag = job.src_iface.region_tag()
src_provider = src_region_tag.split(":")[0]

# give each job a different partition id, so we can read/write to different buckets
partition_id = jobs.index(job)

# source region gateway program
obj_store_read = src_program.add_operator(
GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id
)
# send to all destination
mux_and = src_program.add_operator(GatewayMuxAnd(), parent_handle=obj_store_read, partition_id=partition_id)
dst_prefixes = job.dst_prefixes
for i in range(len(job.dst_ifaces)):
dst_iface = job.dst_ifaces[i]
dst_prefix = dst_prefixes[i]
dst_region_tag = dst_iface.region_tag()
dst_bucket = dst_iface.bucket()
dst_gateways = plan.get_region_gateways(dst_region_tag)

# special case where destination is same region as source
src_program.add_operator(
GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix),
parent_handle=mux_and,
partition_id=partition_id,
)
# update cost per GB
plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag)

# set gateway programs
plan.set_gateway_program(src_region_tag, src_program)
return plan


class DirectPlannerDestOneSided(MulticastDirectPlanner):
"""Planner that only creates instances in the destination region"""

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
# only create in destination region
Expand Down Expand Up @@ -456,42 +566,20 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan:


class UnicastILPPlanner(Planner):
def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float):
self.n_instances = n_instances
self.n_connections = n_connections
self.solver_required_throughput_gbits = required_throughput_gbits
super().__init__()

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
raise NotImplementedError("ILP solver not implemented yet")


class MulticastILPPlanner(Planner):
def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float):
self.n_instances = n_instances
self.n_connections = n_connections
self.solver_required_throughput_gbits = required_throughput_gbits
super().__init__()

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
raise NotImplementedError("ILP solver not implemented yet")


class MulticastMDSTPlanner(Planner):
def __init__(self, n_instances: int, n_connections: int):
self.n_instances = n_instances
self.n_connections = n_connections
super().__init__()

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
raise NotImplementedError("MDST solver not implemented yet")


class MulticastSteinerTreePlanner(Planner):
def __init__(self, n_instances: int, n_connections: int):
self.n_instances = n_instances
self.n_connections = n_connections
super().__init__()

def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
raise NotImplementedError("Steiner tree solver not implemented yet")