Skip to content

Commit

Permalink
multiprocess_lfs
Browse files Browse the repository at this point in the history
  • Loading branch information
aj-ya committed May 19, 2024
1 parent 8b6502b commit 4e4ec6f
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 35 deletions.
63 changes: 28 additions & 35 deletions outpostcli/lfs/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import os
import subprocess
import sys
from multiprocessing import cpu_count
from typing import Dict, List, Optional

import click
import requests
from outpostkit.repository.lfs.logger import create_lfs_logger

from outpostcli.constants import CLI_BINARY_NAME
from outpostcli.lfs.file_slice import FileSlice
from outpostcli.lfs.types import UploadedPartObject
from outpostcli.lfs.parallel import multimap
from outpostcli.lfs.part import transfer_part
from outpostcli.lfs.utils import part_dict_list_to_xml
from outpostcli.utils import click_group

Expand All @@ -22,7 +23,6 @@ def lfs():


_log = create_lfs_logger(__name__)
_log.setLevel(10)

MULTIPART_UPLOAD_COMMAND_NAME = "multipart-upload"
LFS_MULTIPART_UPLOAD_COMMAND = f"lfs {MULTIPART_UPLOAD_COMMAND_NAME}"
Expand Down Expand Up @@ -149,39 +149,32 @@ def on_progress(oid: str, uploaded_bytes: int):
# for k, v in header.items():
# pNo = try_extracting_part_number(k)
# if pNo:
# parts.append((pNo, v))
# parts.append((pNo, v))s
parts = []
for i, presigned_url in enumerate(presigned_urls):
with FileSlice(
filepath, seek_from=i * chunk_size, read_limit=chunk_size
) as data:
try:
_log.info({i, presigned_url})
r = requests.put(presigned_url, data=data)
r.raise_for_status()
parts.append(
UploadedPartObject(
{
"etag": str(r.headers.get("etag")),
"part_number": i + 1,
}
)
)
# In order to support progress reporting while data is uploading / downloading,
# the transfer process should post messages to stdout
write_msg(
{
"event": "progress",
"oid": oid,
"bytesSoFar": (i + 1) * chunk_size,
"bytesSinceLast": chunk_size,
}
)
except Exception as e:
_log.error(e)
raise
# Not precise but that's ok.

cores = cpu_count()
_log.info({cores})
with multimap(cores) as pmap:
for resp in pmap(
transfer_part,
(
(filepath, i + 1, chunk_size, part)
for (i, part) in enumerate(presigned_urls)
),
):
# write_msg(
# {
# "event": "progress",
# "oid": oid,
# "bytesSoFar": (i + 1) * chunk_size,
# "bytesSinceLast": chunk_size,
# }
# )
_log.info(resp)
parts.append(resp)
pass
# for i, presigned_url in enumerate(presigned_urls):

# Not precise but that's ok.
_log.info(parts)
r = requests.post(
completion_url,
Expand Down
34 changes: 34 additions & 0 deletions outpostcli/lfs/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import contextlib
import multiprocessing
from functools import wraps
from multiprocessing.pool import IMapIterator


@contextlib.contextmanager
def multimap(cores=None):
"""
Provide multiprocessing imap like function.
The context manager handles setting up the pool, worked around interrupt issues
and terminating the pool on completion.
"""
if cores is None:
cores = max(multiprocessing.cpu_count() - 1, 1)

def wrapper(func):
def wrap(self, timeout=None):
return func(self, timeout=timeout if timeout is not None else 1e100)

return wrap

IMapIterator.next = wrapper(IMapIterator.next)
pool = multiprocessing.Pool(cores)
yield pool.imap
pool.terminate()


def map_wrap(f):
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)

return wrapper
26 changes: 26 additions & 0 deletions outpostcli/lfs/part.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import requests
from outpostkit.repository.lfs.logger import create_lfs_logger

from outpostcli.lfs.file_slice import FileSlice
from outpostcli.lfs.types import UploadedPartObject

_log = create_lfs_logger(__name__)


def transfer_part(filepath: str, part_number: int, chunk_size: int, presigned_url: str):
with FileSlice(
filepath, seek_from=(part_number - 1) * chunk_size, read_limit=chunk_size
) as data:
try:
r = requests.put(presigned_url, data=data)
r.raise_for_status()

return UploadedPartObject(
{
"etag": str(r.headers.get("etag")),
"part_number": part_number,
}
)
except Exception as e:
_log.error(e)
raise

0 comments on commit 4e4ec6f

Please sign in to comment.