Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReplicatorClient has duplicated logic to partition chunks #344

Closed
parasj opened this issue May 14, 2022 · 2 comments
Closed

ReplicatorClient has duplicated logic to partition chunks #344

parasj opened this issue May 14, 2022 · 2 comments
Assignees

Comments

@parasj
Copy link
Contributor

parasj commented May 14, 2022

with Halo(text="Preparing replication plan", spinner="dots") as spinner:
# pre-fetch instance IPs for all gateways
spinner.text = "Preparing replication plan, fetching instance IPs"
gateway_ips: Dict[Server, str] = {s: s.public_ip() for s in self.bound_nodes.values()}
# make list of chunks
spinner.text = "Preparing replication plan, querying source object store for matching keys"
chunks = []
obj_file_size_bytes = job.src_obj_sizes()
for idx, (src_obj, dest_obj) in enumerate(zip(job.src_objs, job.dest_objs)):
if obj_file_size_bytes:
file_size_bytes = obj_file_size_bytes[src_obj]
else:
file_size_bytes = job.random_chunk_size_mb * MB
chunks.append(
Chunk(src_key=src_obj, dest_key=dest_obj, chunk_id=idx, file_offset_bytes=0, chunk_length_bytes=file_size_bytes)
)
# partition chunks into roughly equal-sized batches (by bytes)
# iteratively adds chunks to the batch with the smallest size
spinner.text = "Preparing replication plan, partitioning chunks into batches"
def partition(items: List[Chunk], n_batches: int) -> List[List[Chunk]]:
batches = [[] for _ in range(n_batches)]
items.sort(key=lambda c: c.chunk_length_bytes, reverse=True)
for item in items:
batch_sizes = [sum(b.chunk_length_bytes for b in bs) for bs in batches]
batches[batch_sizes.index(min(batch_sizes))].append(item)
return batches
src_instances = [self.bound_nodes[n] for n in self.topology.source_instances()]
chunk_batches = partition(chunks, len(src_instances))
assert (len(chunk_batches) == (len(src_instances) - 1)) or (
len(chunk_batches) == len(src_instances)
), f"{len(chunk_batches)} batches, expected {len(src_instances)}"
for batch_idx, batch in enumerate(chunk_batches):
logger.fs.info(f"Batch {batch_idx} size: {sum(c.chunk_length_bytes for c in batch)} with {len(batch)} chunks")
# make list of chunks
# TODO: parallelize for large numbers of objects
chunks = []
obj_file_size_bytes = job.src_obj_sizes()
idx = 0
for (src_obj, dest_obj) in zip(job.src_objs, job.dest_objs):
if obj_file_size_bytes:
if job.max_chunk_size_mb: # split objects into sub-chunks
chunk_size_bytes = int(job.max_chunk_size_mb * 1e6)
num_chunks = int(obj_file_size_bytes[src_obj] / chunk_size_bytes) + 1
# TODO: figure out what to do on # part limits per object
# TODO: only do if num_chunks > 1
# TODO: potentially do this in a seperate thread, and/or after chunks sent
obj_store_interface = ObjectStoreInterface.create(job.dest_region, job.dest_bucket)
upload_id = obj_store_interface.initiate_multipart_upload(dest_obj)
offset = 0
part_num = 1
parts = []
for chunk in range(num_chunks):
# size is min(chunk_size, remaining data)
file_size_bytes = min(chunk_size_bytes, obj_file_size_bytes[src_obj] - offset)
assert file_size_bytes > 0, f"File size <= 0 {file_size_bytes}"
chunks.append(
Chunk(
src_key=src_obj,
dest_key=dest_obj,
chunk_id=idx,
file_offset_bytes=offset,
chunk_length_bytes=file_size_bytes,
part_number=part_num,
upload_id=upload_id,
)
)
parts.append(part_num)
idx += 1
part_num += 1
offset += chunk_size_bytes
# add multipart upload request
self.multipart_upload_requests.append(
{"region": job.dest_region, "bucket": job.dest_bucket, "upload_id": upload_id, "key": dest_obj, "parts": parts}
)
else: # transfer entire object
file_size_bytes = obj_file_size_bytes[src_obj]
chunks.append(
Chunk(src_key=src_obj, dest_key=dest_obj, chunk_id=idx, file_offset_bytes=0, chunk_length_bytes=file_size_bytes)
)
idx += 1
else: # random data replication
file_size_bytes = job.random_chunk_size_mb * MB
chunks.append(
Chunk(src_key=src_obj, dest_key=dest_obj, chunk_id=idx, file_offset_bytes=0, chunk_length_bytes=file_size_bytes)
)
idx += 1
# partition chunks into roughly equal-sized batches (by bytes)
# iteratively adds chunks to the batch with the smallest size
def partition(items: List[Chunk], n_batches: int) -> List[List[Chunk]]:
batches = [[] for _ in range(n_batches)]
items.sort(key=lambda c: c.chunk_length_bytes, reverse=True)
for item in items:
batch_sizes = [sum(b.chunk_length_bytes for b in bs) for bs in batches]
batches[batch_sizes.index(min(batch_sizes))].append(item)
return batches
src_instances = [self.bound_nodes[n] for n in self.topology.source_instances()]
chunk_batches = partition(chunks, len(src_instances))
assert (len(chunk_batches) == (len(src_instances) - 1)) or (
len(chunk_batches) == len(src_instances)
), f"{len(chunk_batches)} batches, expected {len(src_instances)}"
for batch_idx, batch in enumerate(chunk_batches):
logger.info(f"Batch {batch_idx} size: {sum(c.chunk_length_bytes for c in batch)} with {len(batch)} chunks")
# make list of ChunkRequests
chunk_requests_sharded: Dict[int, List[ChunkRequest]] = {}
with Timer("Building chunk requests"):
# make list of ChunkRequests
spinner.text = "Preparing replication plan, building list of chunk requests"
chunk_requests_sharded: Dict[int, List[ChunkRequest]] = {}
for batch_idx, batch in enumerate(chunk_batches):
chunk_requests_sharded[batch_idx] = []
for chunk in batch:
chunk_requests_sharded[batch_idx].append(
ChunkRequest(
chunk=chunk,
src_region=job.source_region,
dst_region=job.dest_region,
src_type="object_store" if job.dest_bucket else "random",
dst_type="object_store" if job.dest_bucket else "save_local",
src_random_size_mb=job.random_chunk_size_mb,
src_object_store_bucket=job.source_bucket,
dst_object_store_bucket=job.dest_bucket,
)
)
logger.fs.debug(f"Batch {batch_idx} size: {sum(c.chunk_length_bytes for c in batch)} with {len(batch)} chunks")
for chunk_request in chunk_requests_sharded[batch_idx]:
logger.fs.debug(f"\t{chunk_request}")

@parasj parasj assigned sarahwooders and unassigned sarahwooders May 14, 2022
@parasj
Copy link
Contributor Author

parasj commented May 14, 2022

@sarahwooders I think there was a bad merge between your PR #303 and #314. I'll clean it up but marking this issue so I can have you verify I don't break multipart support.

@parasj parasj self-assigned this May 14, 2022
@parasj
Copy link
Contributor Author

parasj commented May 17, 2022

Fixed in #348

@parasj parasj closed this as completed May 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants