Skip to content

Commit

Permalink
Format with Black, with trailing commas
Browse files Browse the repository at this point in the history
  • Loading branch information
valohai-bot authored and akx committed Jul 21, 2023
1 parent 13ed437 commit 1851ed7
Show file tree
Hide file tree
Showing 14 changed files with 240 additions and 174 deletions.
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ repos:
- --install-types
- --non-interactive
- --scripts-are-modules

- repo: https://github.com/psf/black
rev: 23.7.0
hooks:
- id: black
2 changes: 1 addition & 1 deletion hai/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2.1'
__version__ = "0.2.1"
80 changes: 50 additions & 30 deletions hai/boto3_multipart_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

class MultipartUploader(EventEmitter):
event_types = {
'progress',
'part-error',
"progress",
"part-error",
}
part_retry_attempts = 10
minimum_file_size = S3_MINIMUM_MULTIPART_FILE_SIZE
Expand All @@ -28,7 +28,7 @@ def __init__(self, s3: BaseClient, log: Optional[logging.Logger] = None) -> None
:param log: A logger, if desired.
"""
self.s3 = s3
self.log = (log or logging.getLogger(self.__class__.__name__))
self.log = log or logging.getLogger(self.__class__.__name__)

def upload_parts(
self,
Expand Down Expand Up @@ -63,44 +63,55 @@ def upload_parts(
Bucket=bucket,
Key=key,
PartNumber=part_number,
UploadId=mpu['UploadId'],
UploadId=mpu["UploadId"],
Body=chunk,
)
except Exception as exc:
self.log.error(f'Error uploading part {part_number} (attempt {attempt})', exc_info=True)
self.emit('part-error', {
'chunk': part_number,
'attempt': attempt,
'attempts_left': self.part_retry_attempts - attempt,
'exception': exc,
})
self.log.error(
f"Error uploading part {part_number} (attempt {attempt})",
exc_info=True,
)
self.emit(
"part-error",
{
"chunk": part_number,
"attempt": attempt,
"attempts_left": self.part_retry_attempts - attempt,
"exception": exc,
},
)
if attempt == self.part_retry_attempts - 1:
raise
else:
bytes += len(chunk)
part_infos.append({'PartNumber': part_number, 'ETag': part['ETag']})
self.emit('progress', {
'part_number': part_number,
'part': part,
'bytes_uploaded': bytes,
})
part_infos.append(
{"PartNumber": part_number, "ETag": part["ETag"]},
)
self.emit(
"progress",
{
"part_number": part_number,
"part": part,
"bytes_uploaded": bytes,
},
)
break
except: # noqa
self.log.debug('Aborting multipart upload')
self.log.debug("Aborting multipart upload")
self.s3.abort_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=mpu['UploadId'],
UploadId=mpu["UploadId"],
)
raise

self.log.info('Completing multipart upload')
self.log.info("Completing multipart upload")

return self.s3.complete_multipart_upload( # type: ignore[no-any-return]
Bucket=bucket,
Key=key,
UploadId=mpu['UploadId'],
MultipartUpload={'Parts': part_infos},
UploadId=mpu["UploadId"],
MultipartUpload={"Parts": part_infos},
)

def read_chunk(self, fp: IO[bytes], size: int) -> bytes:
Expand Down Expand Up @@ -130,8 +141,8 @@ def upload_file(
These roughly correspond to what one might be able to pass to `put_object`.
:return: The return value of the `complete_multipart_upload` call.
"""
if not hasattr(fp, 'read'): # pragma: no cover
raise TypeError('`fp` must have a `read()` method')
if not hasattr(fp, "read"): # pragma: no cover
raise TypeError("`fp` must have a `read()` method")

try:
size = os.stat(fp.fileno()).st_size
Expand All @@ -140,8 +151,8 @@ def upload_file(

if size and size <= self.minimum_file_size:
raise ValueError(
f'File is too small to upload as multipart {size} bytes '
f'(must be at least {self.minimum_file_size} bytes)'
f"File is too small to upload as multipart {size} bytes "
f"(must be at least {self.minimum_file_size} bytes)",
)

if not chunk_size:
Expand All @@ -150,10 +161,14 @@ def upload_file(
maximum = min(S3_MAXIMUM_MULTIPART_CHUNK_SIZE, self.maximum_chunk_size)
chunk_size = int(max(minimum, min(chunk_size, maximum)))

if not S3_MINIMUM_MULTIPART_CHUNK_SIZE <= chunk_size < S3_MAXIMUM_MULTIPART_CHUNK_SIZE:
if (
not S3_MINIMUM_MULTIPART_CHUNK_SIZE
<= chunk_size
< S3_MAXIMUM_MULTIPART_CHUNK_SIZE
):
raise ValueError(
f'Chunk size {chunk_size} is outside the protocol limits '
f'({S3_MINIMUM_MULTIPART_CHUNK_SIZE}..{S3_MAXIMUM_MULTIPART_CHUNK_SIZE})'
f"Chunk size {chunk_size} is outside the protocol limits "
f"({S3_MINIMUM_MULTIPART_CHUNK_SIZE}..{S3_MAXIMUM_MULTIPART_CHUNK_SIZE})",
)

def chunk_generator() -> Generator[bytes, None, None]:
Expand All @@ -163,7 +178,12 @@ def chunk_generator() -> Generator[bytes, None, None]:
break
yield chunk

return self.upload_parts(bucket, key, parts=chunk_generator(), create_params=create_params)
return self.upload_parts(
bucket,
key,
parts=chunk_generator(),
create_params=create_params,
)

def determine_chunk_size_from_file_size(self, file_size: Optional[int]) -> int:
if file_size:
Expand Down
25 changes: 14 additions & 11 deletions hai/event_emitter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, Callable, Dict, Optional, Set

DICT_NAME = '_event_emitter_dict'
DICT_NAME = "_event_emitter_dict"

Handler = Callable[..., Any]

Expand All @@ -13,24 +13,27 @@ class EventEmitter:
event_types: Set[str] = set()

def on(self, event: str, handler: Handler) -> None:
if event != '*' and event not in self.event_types:
raise ValueError(f'event type {event} is not known')
if event != "*" and event not in self.event_types:
raise ValueError(f"event type {event} is not known")

_get_event_emitter_dict(self).setdefault(event, set()).add(handler)

def off(self, event: str, handler: Handler) -> None:
_get_event_emitter_dict(self).get(event, set()).discard(handler)

def emit(self, event: str, args: Optional[Dict[str, Any]] = None, quiet: bool = True) -> None:
def emit(
self,
event: str,
args: Optional[Dict[str, Any]] = None,
quiet: bool = True,
) -> None:
if event not in self.event_types:
raise ValueError(f'event type {event} is not known')
raise ValueError(f"event type {event} is not known")
emitter_dict = _get_event_emitter_dict(self)
handlers = (
emitter_dict.get(event, set()) | emitter_dict.get('*', set())
)
args = (args or {})
args.setdefault('sender', self)
args.setdefault('event', event)
handlers = emitter_dict.get(event, set()) | emitter_dict.get("*", set())
args = args or {}
args.setdefault("sender", self)
args.setdefault("event", event)
for handler in handlers:
try:
handler(**args)
Expand Down
25 changes: 17 additions & 8 deletions hai/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ class ParallelException(Exception):


class TaskFailed(ParallelException):
def __init__(self, message: str, task: "ApplyResult[Any]", exception: Exception) -> None:
def __init__(
self,
message: str,
task: "ApplyResult[Any]",
exception: Exception,
) -> None:
super().__init__(message)
self.task = task
self.task_name = str(getattr(task, "name", None))
Expand Down Expand Up @@ -43,7 +48,9 @@ class ParallelRun:
"""

def __init__(self, parallelism: Optional[int] = None) -> None:
self.pool = ThreadPool(processes=(parallelism or (int(os.cpu_count() or 1) * 2)))
self.pool = ThreadPool(
processes=(parallelism or (int(os.cpu_count() or 1) * 2)),
)
self.task_complete_event = threading.Event()
self.tasks: List[ApplyResult[Any]] = []
self.completed_tasks: WeakSet[ApplyResult[Any]] = WeakSet()
Expand Down Expand Up @@ -79,7 +86,7 @@ def add_task(
:param kwargs: Keyword arguments, if any.
"""
if not name:
name = (getattr(task, '__name__' or None) or str(task)) # type: ignore[arg-type]
name = getattr(task, "__name__" or None) or str(task) # type: ignore[arg-type]
p_task = self.pool.apply_async(
task,
args=args,
Expand All @@ -102,7 +109,7 @@ def wait(
fail_fast: bool = True,
interval: float = 0.5,
callback: Optional[Callable[["ParallelRun"], None]] = None,
max_wait: Optional[float] = None
max_wait: Optional[float] = None,
) -> List["ApplyResult[Any]"]:
"""
Wait until all of the current tasks have finished,
Expand Down Expand Up @@ -136,7 +143,7 @@ def wait(

while True:
if max_wait:
waited_for = (time.time() - start_time)
waited_for = time.time() - start_time
if waited_for > max_wait:
raise TimeoutError(f"Waited for {waited_for}/{max_wait} seconds.")

Expand All @@ -159,7 +166,9 @@ def wait(
# Reset the flag in case it had been set
self.task_complete_event.clear()

return list(self.completed_tasks) # We can just as well return the completed tasks.
return list(
self.completed_tasks,
) # We can just as well return the completed tasks.

def _wait_tick(self, fail_fast: bool) -> bool:
# Keep track of whether there were any incomplete tasks this loop.
Expand Down Expand Up @@ -189,7 +198,7 @@ def _wait_tick(self, fail_fast: bool) -> bool:
# raising the exception directly.
if fail_fast and not task._success: # type: ignore[attr-defined]
exc = task._value # type: ignore[attr-defined]
message = f'[{task.name}] {str(exc)}' # type: ignore[attr-defined]
message = f"[{task.name}] {str(exc)}" # type: ignore[attr-defined]
raise TaskFailed(
message,
task=task,
Expand All @@ -205,7 +214,7 @@ def maybe_raise(self) -> None:
exceptions = self.exceptions
if exceptions:
raise TasksFailed(
'%d exceptions occurred' % len(exceptions),
f"{len(exceptions)} exceptions occurred",
exception_map=exceptions,
)

Expand Down
19 changes: 10 additions & 9 deletions hai/pipe_pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class BasePipePump:
"""
Pump file objects into buffers.
"""

read_size = 1024

def __init__(self) -> None:
Expand All @@ -23,7 +24,7 @@ def register(self, key: str, fileobj: Optional[IO[bytes]]) -> None:
:param fileobj: File object to poll.
"""
key = str(key)
self.buffers[key] = b''
self.buffers[key] = b""
if fileobj:
self.selector.register(fileobj, selectors.EVENT_READ, data=key)

Expand All @@ -45,7 +46,7 @@ def pump(self, timeout: float = 0, max_reads: int = 1) -> int:
while read_num < max_reads:
read_num += 1
should_repeat = False
for (key, _event) in self.selector.select(timeout=timeout):
for key, _event in self.selector.select(timeout=timeout):
fileobj: IO[bytes] = key.fileobj # type: ignore[assignment]
data = fileobj.read(self.read_size)
self.feed(key.data, data)
Expand Down Expand Up @@ -106,7 +107,7 @@ def pumper() -> None:
while self.selector is not None:
self.pump(timeout=interval)

return threading.Thread(target=pumper, name=f'Thread for {self!r}')
return threading.Thread(target=pumper, name=f"Thread for {self!r}")


LineHandler = Callable[[str, List[bytes]], None]
Expand All @@ -118,7 +119,7 @@ class LinePipePump(BasePipePump):
separated by a given bytestring.
"""

def __init__(self, separator: bytes = b'\n') -> None:
def __init__(self, separator: bytes = b"\n") -> None:
"""
:param separator: Line separator byte sequence.
"""
Expand Down Expand Up @@ -155,7 +156,7 @@ def add_line(self, key: str, line: bytes) -> None:
"""
key = str(key)
if not isinstance(line, bytes):
line = line.encode('utf-8')
line = line.encode("utf-8")
line_list = self.lines.setdefault(key, [])
line_list.append(line)

Expand Down Expand Up @@ -210,7 +211,7 @@ def add_chunk_handler(self, handler: ChunkHandler) -> None:

def _process_buffer(self, key: str, buffer: bytes) -> bytes:
while len(buffer) >= self.chunk_size:
chunk, buffer = buffer[:self.chunk_size], buffer[self.chunk_size:]
chunk, buffer = buffer[: self.chunk_size], buffer[self.chunk_size :]
self._handle_chunk(key, chunk)
return buffer

Expand All @@ -237,7 +238,7 @@ class CRLFPipePump(BasePipePump):
Unlike LinePipePump, this does not buffer any history in its own state, only the last line.
"""

CRLF_SEP_RE = re.compile(br'^(.*?)([\r\n])')
CRLF_SEP_RE = re.compile(rb"^(.*?)([\r\n])")

def __init__(self) -> None:
super().__init__()
Expand All @@ -263,8 +264,8 @@ def _process_buffer(self, key: str, buffer: bytes) -> bytes:
m = self.CRLF_SEP_RE.match(buffer)
if not m:
break
self._process_line(key, m.group(1), is_replace=(m.group(2) == b'\r'))
buffer = buffer[m.end():]
self._process_line(key, m.group(1), is_replace=(m.group(2) == b"\r"))
buffer = buffer[m.end() :]
return buffer

def _process_line(self, key: str, new_content: bytes, is_replace: bool) -> None:
Expand Down
Loading

0 comments on commit 1851ed7

Please sign in to comment.