Skip to content

Commit

Permalink
Determine object store keys from listing bucket (#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
sarahwooders authored Feb 1, 2022
1 parent fbf4838 commit ffee9e6
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 7 deletions.
13 changes: 10 additions & 3 deletions skylark/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
)
from skylark.replicate.replication_plan import ReplicationJob, ReplicationTopology
from skylark.replicate.replicator_client import ReplicatorClient
from skylark.obj_store.object_store_interface import ObjectStoreInterface

app = typer.Typer(name="skylark")
app.add_typer(skylark.cli.experiments.app, name="experiments")
Expand Down Expand Up @@ -260,16 +261,22 @@ def replicate_json(
job = rc.run_replication_plan(job)
total_bytes = n_chunks * chunk_size_mb * MB
else:
# TODO: Don't hardcode n_chunks
# TODO: Don't hardcode obj keys

# get object keys with prefix
objs = ObjectStoreInterface.create(topo.source_region(), source_bucket).list_objects(key_prefix)
obj_keys = list([obj.key for obj in objs])

# create replication job
job = ReplicationJob(
source_region=topo.source_region(),
source_bucket=source_bucket,
dest_region=topo.sink_region(),
dest_bucket=dest_bucket,
objs=[f"{key_prefix}/{i}" for i in range(n_chunks)],
objs=obj_keys,
)
job = rc.run_replication_plan(job)

# query chunk sizes
total_bytes = sum([chunk_req.chunk.chunk_length_bytes for chunk_req in job.chunk_requests])

logger.info(f"{total_bytes / GB:.2f}GByte replication job launched")
Expand Down
3 changes: 1 addition & 2 deletions skylark/gateway/gateway_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ def __init__(self, region: str, outgoing_ports: Dict[str, int], chunk_dir: PathL
self.gateway_sender = GatewaySender(chunk_store=self.chunk_store, outgoing_ports=outgoing_ports)
print(outgoing_ports)

self.obj_store_conn = GatewayObjStoreConn(chunk_store=self.chunk_store, max_conn=32)
#self.obj_store_interfaces: Dict[str, ObjectStoreInterface] = {}
self.obj_store_conn = GatewayObjStoreConn(chunk_store=self.chunk_store, max_conn=16)

# Download thread pool
self.dl_pool_semaphore = BoundedSemaphore(value=128)
Expand Down
2 changes: 1 addition & 1 deletion skylark/obj_store/gcs_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, gcp_region, bucket_name):
self._gcs_client = storage.Client()

# TODO: set number of threads
self.pool = ThreadPoolExecutor(max_workers=256)
self.pool = ThreadPoolExecutor(max_workers=8)

def _on_done_download(self, **kwargs):
self.completed_downloads += 1
Expand Down
2 changes: 1 addition & 1 deletion skylark/obj_store/s3_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, aws_region, bucket_name, use_tls=True, part_size=None, throug
self.s3_throughput_target_gbps = throughput_target_gbps
#num_threads=os.cpu_count()
#num_threads=256
num_threads=256
num_threads=4 #256
event_loop_group = EventLoopGroup(num_threads=num_threads, cpu_group=None)
host_resolver = DefaultHostResolver(event_loop_group)
bootstrap = ClientBootstrap(event_loop_group, host_resolver)
Expand Down

0 comments on commit ffee9e6

Please sign in to comment.