Skip to content

Commit

Permalink
Add an option to upload chunks in parallel
Browse files Browse the repository at this point in the history
[noissue]
  • Loading branch information
lubosmj committed Jul 11, 2024
1 parent 0b73cda commit 8d57381
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
66 changes: 63 additions & 3 deletions pulp-glue/pulp_glue/core/context.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from multiprocess import Pool

import datetime
import hashlib
import os
Expand Down Expand Up @@ -45,7 +47,11 @@ class PulpArtifactContext(PulpEntityContext):
ID_PREFIX = "artifacts"

def upload(
self, file: t.IO[bytes], chunk_size: int = 1000000, sha256: t.Optional[str] = None
self,
file: t.IO[bytes],
chunk_size: int = 1000000,
parallel: bool = False,
sha256: t.Optional[str] = None,
) -> t.Any:
size = os.path.getsize(file.name)

Expand Down Expand Up @@ -74,7 +80,7 @@ def upload(
return artifact["pulp_href"]

upload_ctx = PulpUploadContext(self.pulp_ctx)
upload_ctx.upload_file(file, chunk_size)
upload_ctx.upload_file(file, chunk_size, parallel)

self.pulp_ctx.echo(_("Creating artifact."), err=True)
try:
Expand Down Expand Up @@ -492,8 +498,17 @@ def commit(self, sha256: str) -> t.Any:
body={"sha256": sha256},
)

def upload_file(self, file: t.IO[bytes], chunk_size: int = 1000000) -> t.Any:
def upload_file(
self, file: t.IO[bytes], chunk_size: int = 1000000, parallel: bool = False
) -> t.Any:
"""Upload a file and return the uncommitted upload_href."""
if parallel:
self.upload_file_in_parallel(file, chunk_size)
else:
self.upload_file_in_serial(file, chunk_size)

def upload_file_in_serial(self, file: t.IO[bytes], chunk_size: int = 1000000):
"""Upload a file in serial and return uncommitted upload_href."""
start = 0
size = os.path.getsize(file.name)
upload_href = self.create(body={"size": size})["pulp_href"]
Expand All @@ -514,6 +529,51 @@ def upload_file(self, file: t.IO[bytes], chunk_size: int = 1000000) -> t.Any:
self.pulp_ctx.echo(_("Upload complete."), err=True)
return upload_href

def upload_file_in_parallel(self, file: t.IO[bytes], chunk_size: int = 1000000) -> t.Any:
"""Upload a file in 4 processes and return the uncommitted upload_href."""

size = os.path.getsize(file.name)
piece_size = size // 4
last_piece_size = size - piece_size * 4
upload_href = self.create(body={"size": size})["pulp_href"]

args = [
[self, upload_href, size, chunk_size, file, 0, piece_size],
[self, upload_href, size, chunk_size, file, piece_size, 2 * piece_size],
[self, upload_href, size, chunk_size, file, 2 * piece_size, 3 * piece_size],
[self, upload_href, size, chunk_size, file, 3 * piece_size, 4 * piece_size],
]
if last_piece_size:
args.append(
[self, upload_href, size, chunk_size, file, 4 * piece_size + 1, 4 * piece_size + last_piece_size]) # noqa
pool = Pool(processes=len(args))
pool.map(up, args)

return upload_href


def up(args):
self, upload_href, size, chunk_size, file, start, p_size = args
try:
file.seek(start)
self.pulp_href = upload_href
while start < p_size:
if (start + chunk_size) > p_size:
chunk = file.read(p_size - start)
else:
chunk = file.read(chunk_size)
self.upload_chunk(
chunk=chunk,
size=size,
start=start,
)
start += chunk_size
self.pulp_ctx.echo(".", nl=False, err=True)
except Exception as e:
self.delete(upload_href)
raise e
self.pulp_ctx.echo(_("Upload complete."), err=True)


class PulpUserContext(PulpEntityContext):
ENTITY = _("user")
Expand Down
1 change: 1 addition & 0 deletions pulp-glue/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ classifiers = [
dependencies = [
"packaging>=20.0,<24",
"requests>=2.24.0,<2.32",
"multiprocess==0.70.16",
"importlib_resources>=5.4.0,<6.2;python_version<'3.9'",
]

Expand Down

0 comments on commit 8d57381

Please sign in to comment.