Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error handling, documentation and tests of itertools #548

Merged
merged 4 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datalad_next/iter_collections/gitworktree.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def _git_ls_files(path, *args):
yield from decode_bytes(
itemize(
r,
separator=b'\0',
sep=b'\0',
keep_ends=False,
)
)
96 changes: 82 additions & 14 deletions datalad_next/itertools/decode_bytes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Iterator that decodes bytes into strings"""
"""Get strings decoded from chunks of bytes """

from __future__ import annotations

Expand All @@ -18,10 +18,61 @@ def decode_bytes(
) -> Generator[str, None, None]:
"""Decode bytes in an ``iterable`` into strings

This function decodes ``bytes`` or ``bytearray`` into ``str`` objects,
using the specified encoding. Importantly, the decoding input can
be spread across multiple chunks of heterogeneous sizes, for example
output read from a process or pieces of a download.

Multi-byte encodings that are spread over multiple byte chunks are
supported, and chunks are joined as necessary. For example, the utf-8
encoding for ö is ``b'\\xc3\\xb6'``. If the encoding is split in the
middle because a chunk ends with ``b'\\xc3'`` and the next chunk starts
with ``b'\\xb6'``, a naive decoding approach like the following would fail:

.. code-block:: python

>>> [chunk.decode() for chunk in [b'\\xc3', b'\\xb6']] # doctest: +SKIP
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 1, in <listcomp>
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 0: unexpected end of data

Compared to:

.. code-block:: python

>>> from datalad_next.itertools import decode_bytes
>>> tuple(decode_bytes([b'\\xc3', b'\\xb6']))
('ö',)

Input chunks are only joined, if it is necessary to properly decode bytes:

.. code-block:: python

>>> from datalad_next.itertools import decode_bytes
>>> tuple(decode_bytes([b'\\xc3', b'\\xb6', b'a']))
('ö', 'a')

If ``backslash_replace`` is ``True``, undecodable bytes will be
replaced with a backslash-substitution. Otherwise,
undecodable bytes will raise a ``UnicodeDecodeError``:

.. code-block:: python

>>> tuple(decode_bytes([b'\\xc3']))
('\\\\xc3',)
>>> tuple(decode_bytes([b'\\xc3'], backslash_replace=False)) # doctest: +SKIP
Traceback (most recent call last):
...
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 1: invalid continuation byte

Backslash-replacement of undecodable bytes is an ambiguous mapping,
because, for example, ``b'\\xc3'`` can already be present in the input.

Parameters
----------
iterable: Iterable[bytes]
Iterable that yields bytes that should be decoded.
Iterable that yields bytes that should be decoded
encoding: str (default: ``'utf-8'``)
Encoding to be used for decoding.
backslash_replace: bool (default: ``True``)
Expand All @@ -41,7 +92,26 @@ def decode_bytes(
If ``backslash_replace`` is ``False`` and the data yielded by
``iterable`` cannot be decoded with the specified ``encoding``
"""

def handle_decoding_error(position: int,
exc: UnicodeDecodeError
) -> tuple[int, str]:
""" Handle a UnicodeDecodeError """
if not backslash_replace:
# Signal the error to the caller
raise exc
else:
return (
position + exc.end,
joined_data[:position + exc.start].decode(encoding)
+ joined_data[position + exc.start:position + exc.end].decode(
encoding,
errors='backslashreplace'
),
)

joined_data = b''
pending_error = None
position = 0
for chunk in iterable:
joined_data += chunk
Expand All @@ -60,17 +130,15 @@ def decode_bytes(
# next chunk, which might fix the problem.
if position + e.end == len(joined_data):
# Wait for the next chunk, which might fix the problem
pending_error = e
break
else:
if not backslash_replace:
# Signal the error to the caller
raise
else:
yield (
joined_data[:position + e.start].decode(encoding)
+ joined_data[position + e.start:position + e.end].decode(
encoding,
errors='backslashreplace'
)
)
position += e.end
pending_error = None
position, string = handle_decoding_error(position, e)
yield string

if pending_error:
# If the last chunk has a decoding error at the end, process it.
position, string = handle_decoding_error(position, pending_error)
if string:
yield string
104 changes: 66 additions & 38 deletions datalad_next/itertools/itemize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" Generator the emits only complete lines """
"""Get complete items from input chunks"""

from __future__ import annotations

Expand All @@ -13,64 +13,92 @@

def itemize(
iterable: Iterable[bytes | str],
separator: str | bytes | None,
sep: str | bytes | None,
*,
keep_ends: bool = False,
) -> Generator[bytes | str, None, None]:
""" Generator that emits only complete items from chunks of an iterable
"""Yields complete items (only), assembled from an iterable

This generator consumes chunks from an iterable and yields items defined by
a separator. An item might span multiple input chunks.
This function consumes chunks from an iterable and yields items defined by
a separator. An item might span multiple input chunks. Input (chunks) can
be ``bytes``, ``bytearray``, or ``str`` objects. The result type is
determined by the type of the first input chunk. During its runtime, the
type of the elements in ``iterable`` must not change.

Items are defined by a ``separator``. If ``separator`` is ``None``, the
line-separators built into `str.plitlines` are used.
Items are defined by a separator given via ``sep``. If ``sep`` is ``None``,
the line-separators built into ``str.splitlines()`` are used, and each
yielded item will be a line. If ``sep`` is not `None`, its type must match
the type of the elements in ``iterable``.

The generator works on string or byte chunks, depending on the type of the
first element in ``iterable``. During its runtime, the type of the elements
in ``iterable`` must not change. If ``separator`` is not `None`, its type
must match the type of the elements in ``iterable``.
A separator could, for example, be ``b'\\n'``, in which case the items
would be terminated by Unix line-endings, i.e. each yielded item is a
single line. The separator could also be, ``b'\\x00'`` (or ``'\\x00'``),
to split zero-byte delimited content, like the output of
``git ls-files -z``.

The complexity of itemization without a defined separator is higher than
the complexity of itemization with a defined separator (this is due to
the externally unavailable set of line-separators that are built into
`splitlines`).
Separators can be longer than one byte or character, e.g. ``b'\\r\\n'``, or
``b'\\n-------------------\\n'``.

Runtime with ``keep_end=False`` is faster than otherwise, when a separator
is defined.
Content after the last separator, possibly merged across input chunks, is
always yielded as the last item, even if it is not terminated by the
separator.

EOF ends all lines, but will never be present in the result, even if
``keep_ends`` is ``True``.
Performance notes:

- Using ``None`` as a separator (splitlines-mode) is slower than providing
a specific separator.
- If another separator than ``None`` is used, the runtime with ``keep_end=False`` is faster than with ``keep_end=True``.

Parameters
----------
iterable: Iterable[bytes | str]
The iterable that yields the input data
separator: str | bytes | None
sep: str | bytes | None
The separator that defines items. If ``None``, the items are
determined by the line-separators that are built into `splitlines`.
determined by the line-separators that are built into
``str.splitlines()``.
keep_ends: bool
If `True`, the item-separator will be present at the end of a
yielded item line. If `False`, items will not contain the
separator. Preserving separators an additional implies a runtime cost.
If `True`, the item-separator will remain at the end of a
yielded item. If `False`, items will not contain the
separator. Preserving separators implies a runtime cost, unless the separator is ``None``.

Yields
------
bytes | str
The items determined from the input iterable. The type of the yielded
lines depends on the type of the first element in ``iterable``.
items depends on the type of the first element in ``iterable``.

Examples
--------

.. code-block:: python

>>> from datalad_next.itertools import itemize
>>> with open('/etc/passwd', 'rt') as f: # doctest: +SKIP
... print(tuple(itemize(iter(f.read, ''), sep=None))[0:2]) # doctest: +SKIP
('root:x:0:0:root:/root:/bin/bash',
'systemd-timesync:x:497:497:systemd Time Synchronization:/:/usr/sbin/nologin')
>>> with open('/etc/passwd', 'rt') as f: # doctest: +SKIP
... print(tuple(itemize(iter(f.read, ''), sep=':'))[0:10]) # doctest: +SKIP
('root', 'x', '0', '0', 'root', '/root',
'/bin/bash\\nsystemd-timesync', 'x', '497', '497')
>>> with open('/etc/passwd', 'rt') as f: # doctest: +SKIP
... print(tuple(itemize(iter(f.read, ''), sep=':', keep_ends=True))[0:10]) # doctest: +SKIP
('root:', 'x:', '0:', '0:', 'root:', '/root:',
'/bin/bash\\nsystemd-timesync:', 'x:', '497:', '497:')
"""
if separator is None:
if sep is None:
yield from _split_lines(iterable, keep_ends=keep_ends)
else:
yield from _split_lines_with_separator(
yield from _split_items_with_separator(
iterable,
separator=separator,
sep=sep,
keep_ends=keep_ends,
)


def _split_lines_with_separator(iterable: Iterable[bytes | str],
separator: str | bytes,
def _split_items_with_separator(iterable: Iterable[bytes | str],
sep: str | bytes,
keep_ends: bool = False,
) -> Generator[bytes | str, None, None]:
assembled = None
Expand All @@ -79,20 +107,20 @@ def _split_lines_with_separator(iterable: Iterable[bytes | str],
assembled = chunk
else:
assembled += chunk
lines = assembled.split(sep=separator)
if len(lines) == 1:
items = assembled.split(sep=sep)
if len(items) == 1:
continue

if assembled.endswith(separator):
if assembled.endswith(sep):
assembled = None
else:
assembled = lines[-1]
lines.pop(-1)
assembled = items[-1]
items.pop(-1)
if keep_ends:
for line in lines:
yield line + separator
for item in items:
yield item + sep
else:
yield from lines
yield from items

if assembled:
yield assembled
Expand Down
16 changes: 16 additions & 0 deletions datalad_next/itertools/tests/test_decode_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import sys
import timeit

import pytest

from ..decode_bytes import decode_bytes


Expand All @@ -24,6 +26,14 @@ def test_unfixable_error_decoding():
assert ''.join(r) == 'abc\\xc3deföghi'


def test_undecodable_byte():
# check that a single undecodable byte is handled properly
r = tuple(decode_bytes([b'\xc3']))
assert ''.join(r) == '\\xc3'
with pytest.raises(UnicodeDecodeError):
tuple(decode_bytes([b'\xc3'], backslash_replace=False))


def test_performance():
encoded = 'ö'.encode('utf-8')
part_1, part_2 = encoded[:1], encoded[1:]
Expand All @@ -33,3 +43,9 @@ def test_performance():

d1 = timeit.timeit(lambda: tuple(decode_bytes(iterable)), number=1000000)
print(d1, file=sys.stderr)


def test_no_empty_strings():
# check that empty strings are not yielded
r = tuple(decode_bytes([b'\xc3', b'\xb6']))
assert r == ('ö',)
6 changes: 3 additions & 3 deletions datalad_next/itertools/tests/test_itemize.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ def test_assembling_and_splitting(input_chunks, separator):
assert len(r) == 3
assert empty.join(r) == empty.join(input_chunks)

r = tuple(itemize(input_chunks, separator=separator, keep_ends=True))
r = tuple(itemize(input_chunks, sep=separator, keep_ends=True))
assert len(r) == 3
assert empty.join(r) == empty.join(input_chunks)

r = tuple(itemize(input_chunks, separator=separator))
r = tuple(itemize(input_chunks, sep=separator))
assert len(r) == 3
assert empty.join(r) == empty.join(input_chunks).replace(separator, empty)

r = tuple(itemize(input_chunks + input_chunks[:1], separator=separator, keep_ends=True))
r = tuple(itemize(input_chunks + input_chunks[:1], sep=separator, keep_ends=True))
assert len(r) == 4
assert r[3] == input_chunks[0]
Loading