Skip to content

Commit

Permalink
Merge pull request #29 from valohai/renovate
Browse files Browse the repository at this point in the history
Renovate tooling (and reformat with Black)
  • Loading branch information
akx authored Jul 21, 2023
2 parents 99c0ad0 + 1851ed7 commit 3391d17
Show file tree
Hide file tree
Showing 17 changed files with 277 additions and 211 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ jobs:
- '3.11'
steps:
- name: 'Set up Python ${{ matrix.python-version }}'
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: '${{ matrix.python-version }}'
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- run: pip install -e . -r requirements-test.txt
- run: py.test -vvv --cov .
- uses: codecov/codecov-action@v3
Expand All @@ -36,23 +36,23 @@ jobs:
Lint:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-python@v2
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- run: pip install -e . pre-commit
- run: pre-commit run --all-files

Build:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-python@v2
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- run: pip install build
- run: python -m build .
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v3
with:
name: dist
path: dist/
11 changes: 8 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,25 @@ repos:
- id: end-of-file-fixer
- id: trailing-whitespace

- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.254
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.0.278
hooks:
- id: ruff
args:
- --fix
- --exit-non-zero-on-fix

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.0.1
rev: v1.4.1
hooks:
- id: mypy
exclude: hai_tests/test_.*
args:
- --install-types
- --non-interactive
- --scripts-are-modules

- repo: https://github.com/psf/black
rev: 23.7.0
hooks:
- id: black
1 change: 0 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

The MIT License (MIT)

Copyright (c) 2018 Valohai
Expand Down
2 changes: 1 addition & 1 deletion hai/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2.1'
__version__ = "0.2.1"
82 changes: 51 additions & 31 deletions hai/boto3_multipart_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

class MultipartUploader(EventEmitter):
event_types = {
'progress',
'part-error',
"progress",
"part-error",
}
part_retry_attempts = 10
minimum_file_size = S3_MINIMUM_MULTIPART_FILE_SIZE
Expand All @@ -28,7 +28,7 @@ def __init__(self, s3: BaseClient, log: Optional[logging.Logger] = None) -> None
:param log: A logger, if desired.
"""
self.s3 = s3
self.log = (log or logging.getLogger(self.__class__.__name__))
self.log = log or logging.getLogger(self.__class__.__name__)

def upload_parts(
self,
Expand Down Expand Up @@ -63,44 +63,55 @@ def upload_parts(
Bucket=bucket,
Key=key,
PartNumber=part_number,
UploadId=mpu['UploadId'],
UploadId=mpu["UploadId"],
Body=chunk,
)
except Exception as exc:
self.log.error(f'Error uploading part {part_number} (attempt {attempt})', exc_info=True)
self.emit('part-error', {
'chunk': part_number,
'attempt': attempt,
'attempts_left': self.part_retry_attempts - attempt,
'exception': exc,
})
self.log.error(
f"Error uploading part {part_number} (attempt {attempt})",
exc_info=True,
)
self.emit(
"part-error",
{
"chunk": part_number,
"attempt": attempt,
"attempts_left": self.part_retry_attempts - attempt,
"exception": exc,
},
)
if attempt == self.part_retry_attempts - 1:
raise
else:
bytes += len(chunk)
part_infos.append({'PartNumber': part_number, 'ETag': part['ETag']})
self.emit('progress', {
'part_number': part_number,
'part': part,
'bytes_uploaded': bytes,
})
part_infos.append(
{"PartNumber": part_number, "ETag": part["ETag"]},
)
self.emit(
"progress",
{
"part_number": part_number,
"part": part,
"bytes_uploaded": bytes,
},
)
break
except: # noqa
self.log.debug('Aborting multipart upload')
self.log.debug("Aborting multipart upload")
self.s3.abort_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=mpu['UploadId'],
UploadId=mpu["UploadId"],
)
raise

self.log.info('Completing multipart upload')
self.log.info("Completing multipart upload")

return self.s3.complete_multipart_upload( # type: ignore[no-any-return]
Bucket=bucket,
Key=key,
UploadId=mpu['UploadId'],
MultipartUpload={'Parts': part_infos},
UploadId=mpu["UploadId"],
MultipartUpload={"Parts": part_infos},
)

def read_chunk(self, fp: IO[bytes], size: int) -> bytes:
Expand Down Expand Up @@ -130,8 +141,8 @@ def upload_file(
These roughly correspond to what one might be able to pass to `put_object`.
:return: The return value of the `complete_multipart_upload` call.
"""
if not hasattr(fp, 'read'): # pragma: no cover
raise TypeError('`fp` must have a `read()` method')
if not hasattr(fp, "read"): # pragma: no cover
raise TypeError("`fp` must have a `read()` method")

try:
size = os.stat(fp.fileno()).st_size
Expand All @@ -140,8 +151,8 @@ def upload_file(

if size and size <= self.minimum_file_size:
raise ValueError(
f'File is too small to upload as multipart {size} bytes '
f'(must be at least {self.minimum_file_size} bytes)'
f"File is too small to upload as multipart {size} bytes "
f"(must be at least {self.minimum_file_size} bytes)",
)

if not chunk_size:
Expand All @@ -150,20 +161,29 @@ def upload_file(
maximum = min(S3_MAXIMUM_MULTIPART_CHUNK_SIZE, self.maximum_chunk_size)
chunk_size = int(max(minimum, min(chunk_size, maximum)))

if not S3_MINIMUM_MULTIPART_CHUNK_SIZE <= chunk_size < S3_MAXIMUM_MULTIPART_CHUNK_SIZE:
if (
not S3_MINIMUM_MULTIPART_CHUNK_SIZE
<= chunk_size
< S3_MAXIMUM_MULTIPART_CHUNK_SIZE
):
raise ValueError(
f'Chunk size {chunk_size} is outside the protocol limits '
f'({S3_MINIMUM_MULTIPART_CHUNK_SIZE}..{S3_MAXIMUM_MULTIPART_CHUNK_SIZE})'
f"Chunk size {chunk_size} is outside the protocol limits "
f"({S3_MINIMUM_MULTIPART_CHUNK_SIZE}..{S3_MAXIMUM_MULTIPART_CHUNK_SIZE})",
)

def chunk_generator() -> Generator[bytes, None, None]:
while True:
chunk = self.read_chunk(fp, chunk_size) # type: ignore[arg-type]
chunk = self.read_chunk(fp, chunk_size)
if not chunk:
break
yield chunk

return self.upload_parts(bucket, key, parts=chunk_generator(), create_params=create_params)
return self.upload_parts(
bucket,
key,
parts=chunk_generator(),
create_params=create_params,
)

def determine_chunk_size_from_file_size(self, file_size: Optional[int]) -> int:
if file_size:
Expand Down
25 changes: 14 additions & 11 deletions hai/event_emitter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, Callable, Dict, Optional, Set

DICT_NAME = '_event_emitter_dict'
DICT_NAME = "_event_emitter_dict"

Handler = Callable[..., Any]

Expand All @@ -13,24 +13,27 @@ class EventEmitter:
event_types: Set[str] = set()

def on(self, event: str, handler: Handler) -> None:
if event != '*' and event not in self.event_types:
raise ValueError(f'event type {event} is not known')
if event != "*" and event not in self.event_types:
raise ValueError(f"event type {event} is not known")

_get_event_emitter_dict(self).setdefault(event, set()).add(handler)

def off(self, event: str, handler: Handler) -> None:
_get_event_emitter_dict(self).get(event, set()).discard(handler)

def emit(self, event: str, args: Optional[Dict[str, Any]] = None, quiet: bool = True) -> None:
def emit(
self,
event: str,
args: Optional[Dict[str, Any]] = None,
quiet: bool = True,
) -> None:
if event not in self.event_types:
raise ValueError(f'event type {event} is not known')
raise ValueError(f"event type {event} is not known")
emitter_dict = _get_event_emitter_dict(self)
handlers = (
emitter_dict.get(event, set()) | emitter_dict.get('*', set())
)
args = (args or {})
args.setdefault('sender', self)
args.setdefault('event', event)
handlers = emitter_dict.get(event, set()) | emitter_dict.get("*", set())
args = args or {}
args.setdefault("sender", self)
args.setdefault("event", event)
for handler in handlers:
try:
handler(**args)
Expand Down
25 changes: 17 additions & 8 deletions hai/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ class ParallelException(Exception):


class TaskFailed(ParallelException):
def __init__(self, message: str, task: "ApplyResult[Any]", exception: Exception) -> None:
def __init__(
self,
message: str,
task: "ApplyResult[Any]",
exception: Exception,
) -> None:
super().__init__(message)
self.task = task
self.task_name = str(getattr(task, "name", None))
Expand Down Expand Up @@ -43,7 +48,9 @@ class ParallelRun:
"""

def __init__(self, parallelism: Optional[int] = None) -> None:
self.pool = ThreadPool(processes=(parallelism or (int(os.cpu_count() or 1) * 2)))
self.pool = ThreadPool(
processes=(parallelism or (int(os.cpu_count() or 1) * 2)),
)
self.task_complete_event = threading.Event()
self.tasks: List[ApplyResult[Any]] = []
self.completed_tasks: WeakSet[ApplyResult[Any]] = WeakSet()
Expand Down Expand Up @@ -79,7 +86,7 @@ def add_task(
:param kwargs: Keyword arguments, if any.
"""
if not name:
name = (getattr(task, '__name__' or None) or str(task)) # type: ignore[arg-type]
name = getattr(task, "__name__" or None) or str(task) # type: ignore[arg-type]
p_task = self.pool.apply_async(
task,
args=args,
Expand All @@ -102,7 +109,7 @@ def wait(
fail_fast: bool = True,
interval: float = 0.5,
callback: Optional[Callable[["ParallelRun"], None]] = None,
max_wait: Optional[float] = None
max_wait: Optional[float] = None,
) -> List["ApplyResult[Any]"]:
"""
Wait until all of the current tasks have finished,
Expand Down Expand Up @@ -136,7 +143,7 @@ def wait(

while True:
if max_wait:
waited_for = (time.time() - start_time)
waited_for = time.time() - start_time
if waited_for > max_wait:
raise TimeoutError(f"Waited for {waited_for}/{max_wait} seconds.")

Expand All @@ -159,7 +166,9 @@ def wait(
# Reset the flag in case it had been set
self.task_complete_event.clear()

return list(self.completed_tasks) # We can just as well return the completed tasks.
return list(
self.completed_tasks,
) # We can just as well return the completed tasks.

def _wait_tick(self, fail_fast: bool) -> bool:
# Keep track of whether there were any incomplete tasks this loop.
Expand Down Expand Up @@ -189,7 +198,7 @@ def _wait_tick(self, fail_fast: bool) -> bool:
# raising the exception directly.
if fail_fast and not task._success: # type: ignore[attr-defined]
exc = task._value # type: ignore[attr-defined]
message = f'[{task.name}] {str(exc)}' # type: ignore[attr-defined]
message = f"[{task.name}] {str(exc)}" # type: ignore[attr-defined]
raise TaskFailed(
message,
task=task,
Expand All @@ -205,7 +214,7 @@ def maybe_raise(self) -> None:
exceptions = self.exceptions
if exceptions:
raise TasksFailed(
'%d exceptions occurred' % len(exceptions),
f"{len(exceptions)} exceptions occurred",
exception_map=exceptions,
)

Expand Down
Loading

0 comments on commit 3391d17

Please sign in to comment.