From 4ee9c272e279437a284b9020ad6ef615c64d32cb Mon Sep 17 00:00:00 2001 From: Michael Hanke Date: Tue, 18 Jun 2024 09:27:53 +0200 Subject: [PATCH 1/3] rf(`iterable_subprocess`): removed package and use from `datasalad` This change introduces a new dependency to the `datasalad` library. This library provides (updated) implementations of the `iterable_subprocess` package that was a part of `datalad-next`, and is removed here. This migration poses the challenge of associated API changes and a new exception type (`datasalad.runners.CommandError`). This is (for now) mitigated here by provinding a shim for `iter_subproc()`, the main entrypoint, that translates the API and exception. This shim is immediately marked as deprecated. Future migrations are expected to strengthen the dependency on `datasalad` further. --- datalad_next/iterable_subprocess/LICENSE | 21 -- datalad_next/iterable_subprocess/README.md | 81 ---- datalad_next/iterable_subprocess/__init__.py | 14 +- datalad_next/iterable_subprocess/codecov.yml | 1 - .../iterable_subprocess.py | 206 ----------- .../iterable_subprocess/pyproject.toml | 31 -- .../test_iterable_subprocess.py | 347 ------------------ datalad_next/runners/iter_subproc.py | 35 +- .../runners/tests/test_iter_subproc.py | 33 -- datalad_next/shell/operations/common.py | 2 +- datalad_next/shell/response_generators.py | 2 +- datalad_next/shell/shell.py | 12 +- .../shell/tests/test_response_generators.py | 2 +- datalad_next/shell/tests/test_shell.py | 6 +- datalad_next/url_operations/ssh.py | 10 +- setup.cfg | 1 + 16 files changed, 49 insertions(+), 755 deletions(-) delete mode 100644 datalad_next/iterable_subprocess/LICENSE delete mode 100644 datalad_next/iterable_subprocess/README.md delete mode 100644 datalad_next/iterable_subprocess/codecov.yml delete mode 100644 datalad_next/iterable_subprocess/iterable_subprocess.py delete mode 100644 datalad_next/iterable_subprocess/pyproject.toml delete mode 100644 datalad_next/iterable_subprocess/test_iterable_subprocess.py delete mode 100644 datalad_next/runners/tests/test_iter_subproc.py diff --git a/datalad_next/iterable_subprocess/LICENSE b/datalad_next/iterable_subprocess/LICENSE deleted file mode 100644 index 5a6779fd..00000000 --- a/datalad_next/iterable_subprocess/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2021 Department for International Trade - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/datalad_next/iterable_subprocess/README.md b/datalad_next/iterable_subprocess/README.md deleted file mode 100644 index 045ee2d7..00000000 --- a/datalad_next/iterable_subprocess/README.md +++ /dev/null @@ -1,81 +0,0 @@ -# iterable-subprocess - -[![PyPI package](https://img.shields.io/pypi/v/iterable-subprocess?label=PyPI%20package&color=%234c1)](https://pypi.org/project/iterable-subprocess/) [![Test suite](https://img.shields.io/github/actions/workflow/status/uktrade/iterable-subprocess/test.yml?label=Test%20suite)](https://github.com/uktrade/iterable-subprocess/actions/workflows/test.yml) [![Code coverage](https://img.shields.io/codecov/c/github/uktrade/iterable-subprocess?label=Code%20coverage)](https://app.codecov.io/gh/uktrade/iterable-subprocess) - -Python context manager to communicate with a subprocess using iterables. This offers a higher level interface to subprocesses than Python's built-in subprocess module, and is particularly helpful when data won't fit in memory and has to be streamed. - -This also allows an external subprocess to be naturally placed in a chain of iterables as part of a data processing pipeline. - - -## Installation - -```bash -pip install iterable-subprocess -``` - - -## Usage - -A single context manager `iterable_subprocess` is exposed. The first parameter is the `args` argument passed to the [Popen Constructor](https://docs.python.org/3/library/subprocess.html#popen-constructor), and the second is an iterable whose items must be `bytes` instances and are sent to the subprocess's standard input. - -Returned from the function is an iterable whose items are `bytes` instances of the process's standard output. - -```python -from iterable_subprocess import iterable_subprocess - -# In a real case could be a generator function that reads from the filesystem or the network -iterable_of_bytes = ( - b'first\n', - b'second\n', - b'third\n', -) - -with iterable_subprocess(['cat'], iterable_of_bytes) as output: - for chunk in output: - print(chunk) -``` - - -## Exceptions - -Python's `subprocess.Popen` is used to start the process, and any exceptions it raises are propagated without transformation. For example, if the subprocess can't be found, then a `FileNotFoundError` is raised. - -If the process starts, but exits with a non-zero return code, then an `iterable_subprocess.IterableSubprocessError` exception will be raised with two members: - -- `returncode` - the return code of the process -- `stderr` - the final 65536 bytes of the standard error of the process - -However, if the process starts, but an exception is raised from inside the context or from the source iterable, then this exception is propagated, even if the process subsequently exits with a non-zero return code. - - -## Example: unzip the first file of a ZIP archive while downloading - -It's possible to download the bytes of a ZIP file in Python, and unzip by passing the bytes to `funzip`, as in the following example. - -```python -import httpx -from iterable_subprocess import iterable_subprocess - -with \ - httpx.stream('GET', 'https://www.example.com/my.zip') as r, \ - iterable_subprocess(['funzip'], r.iter_bytes()) as unzipped_chunks: - - for chunk in unzipped_chunks: - print(chunk) -``` - -Note that it's also possible to stream unzip files without resorting to another process using [stream-unzip](https://github.com/uktrade/stream-unzip). - - -## Example: download file using curl and process in Python - -You would usually download directly from Python, but as an example, you can download using the curl executable and process its output in Python. - -```python -from iterable_subprocess import iterable_subprocess - -url = 'https://data.api.trade.gov.uk/v1/datasets/uk-tariff-2021-01-01/versions/v3.0.212/tables/measures-on-declarable-commodities/data?format=csv' -with iterable_subprocess(['curl', '--no-progress-meter', '--fail-with-body', url], ()) as output: - for chunk in output: - print(chunk) -``` diff --git a/datalad_next/iterable_subprocess/__init__.py b/datalad_next/iterable_subprocess/__init__.py index cc245be0..48ea6080 100644 --- a/datalad_next/iterable_subprocess/__init__.py +++ b/datalad_next/iterable_subprocess/__init__.py @@ -1,5 +1,10 @@ """Context manager to communicate with a subprocess using iterables +.. deprecated:: 1.6 + + This code has been moved to the datasalad library. + Use it from ``datasalad.iterable_subprocess`` instead. + This offers a higher level interface to subprocesses than Python's built-in subprocess module, and is particularly helpful when data won't fit in memory and has to be streamed. @@ -13,11 +18,8 @@ The original code was made available under the terms of the MIT License, and was written by Michal Charemza. +""" -.. currentmodule:: datalad_next.iterable_subprocess -.. autosummary:: - :toctree: generated +__all__ = ['iterable_subprocess'] - iterable_subprocess -""" -from .iterable_subprocess import iterable_subprocess +from datasalad.iterable_subprocess import iterable_subprocess diff --git a/datalad_next/iterable_subprocess/codecov.yml b/datalad_next/iterable_subprocess/codecov.yml deleted file mode 100644 index 69cb7601..00000000 --- a/datalad_next/iterable_subprocess/codecov.yml +++ /dev/null @@ -1 +0,0 @@ -comment: false diff --git a/datalad_next/iterable_subprocess/iterable_subprocess.py b/datalad_next/iterable_subprocess/iterable_subprocess.py deleted file mode 100644 index 9bf8fb6e..00000000 --- a/datalad_next/iterable_subprocess/iterable_subprocess.py +++ /dev/null @@ -1,206 +0,0 @@ -from collections import deque -from collections.abc import Generator -from contextlib import contextmanager -from subprocess import PIPE, Popen -from threading import Thread - -# Importing from datalad-core to prevent circular imports -from datalad_next.exceptions import CommandError - - -class OutputFrom(Generator): - def __init__(self, stdout, stderr_deque, chunk_size=65536): - self.stdout = stdout - self.stderr_deque = stderr_deque - self.chunk_size = chunk_size - self.returncode = None - - def send(self, _): - chunk = self.stdout.read(self.chunk_size) - if not chunk: - raise StopIteration - return chunk - - def throw(self, typ, value=None, traceback=None): - return super().throw(typ, value, traceback) - - -@contextmanager -def iterable_subprocess( - program, - input_chunks, - chunk_size=65536, - cwd=None, - bufsize=-1, -): - # This context starts a thread that populates the subprocess's standard input. It - # also starts a threads that reads the process's standard error. Otherwise we risk - # a deadlock - there is no output because the process is waiting for more input. - # - # This itself introduces its own complications and risks, but hopefully mitigated - # by having a well defined start and stop mechanism that also avoid sending data - # to the process if it's not running - # - # To start, i.e. on entry to the context from client code - # - The process is started - # - The thread to read from standard error is started - # - The thread to populate input is started - # - # When running: - # - The standard input thread iterates over the input, passing chunks to the process - # - While the standard error thread fetches the error output - # - And while this thread iterates over the processe's output from client code - # in the context - # - # To stop, i.e. on exit of the context from client code - # - This thread closes the process's standard output - # - Wait for the standard input thread to exit - # - Wait for the standard error thread to exit - # - Wait for the process to exit - # - # By using context managers internally, this also gives quite strong guarantees that - # the above order is enforced to make sure the thread doesn't send data to the process - # whose standard input is closed and so we don't get BrokenPipe errors - - # Writing to the process can result in a BrokenPipeError. If this then results in - # a non-zero code from the process, the process's standard error probably has useful - # information on the cause of this. However, the non-zero error code happens after - # BrokenPipeError, so propagating "what happens first" isn't helpful in this case. - # So, we re-raise BrokenPipeError as _BrokenPipeError so we can catch it after the - # process ends to then allow us to branch on its error code: - # - if it's non-zero raise a CommandError containing its standard error - # - if it's zero, re-raise the original BrokenPipeError - class _BrokenPipeError(Exception): - pass - - @contextmanager - def thread(target, *args): - exception = None - - def wrapper(): - nonlocal exception - try: - target(*args) - except BaseException as e: - exception = e - - t = Thread(target=wrapper) - - def start(): - t.start() - - def join(): - if t.ident: - t.join() - return exception - - yield start, join - - def input_to(stdin): - try: - for chunk in input_chunks: - try: - stdin.write(chunk) - except BrokenPipeError: - raise _BrokenPipeError() - except OSError as e: - if e.errno != 22: - # Errno22 indicates an IO failure with a - # file descriptor (maybe process is dead already) - raise _BrokenPipeError() - else: - # no idea what this could be, let it bubble up - raise - finally: - try: - stdin.close() - except BrokenPipeError: - raise _BrokenPipeError() - except OSError as e: - # silently ignore Errno22, which happens on - # windows when trying to interacted with file descriptors - # associated with a process that exited already - if e.errno != 22: - raise - - def keep_only_most_recent(stderr, stderr_deque): - total_length = 0 - while True: - chunk = stderr.read(chunk_size) - total_length += len(chunk) - if not chunk: - break - stderr_deque.append(chunk) - if total_length - len(stderr_deque[0]) >= chunk_size: - total_length -= len(stderr_deque[0]) - stderr_deque.popleft() - - def raise_if_not_none(exception): - if exception is not None: - raise exception from None - - proc = None - stderr_deque = deque() - chunk_generator = None - exception_stdin = None - exception_stderr = None - - try: - - with \ - Popen( # nosec - all arguments are controlled by the caller - program, - stdin=PIPE, - stdout=PIPE, - stderr=PIPE, - cwd=cwd, - bufsize=bufsize, - ) as proc, \ - thread( - keep_only_most_recent, - proc.stderr, - stderr_deque, - ) as (start_t_stderr, join_t_stderr), \ - thread( - input_to, - proc.stdin, - ) as (start_t_stdin, join_t_stdin): - - try: - start_t_stderr() - start_t_stdin() - chunk_generator = OutputFrom( - proc.stdout, - stderr_deque, - chunk_size - ) - yield chunk_generator - except BaseException: - proc.terminate() - raise - finally: - proc.stdout.close() - exception_stdin = join_t_stdin() - exception_stderr = join_t_stderr() - - raise_if_not_none(exception_stdin) - raise_if_not_none(exception_stderr) - - except _BrokenPipeError as e: - if chunk_generator: - chunk_generator.returncode = proc.returncode - if proc.returncode == 0: - raise e.__context__ from None - except BaseException: - if chunk_generator: - chunk_generator.returncode = proc.returncode - raise - - chunk_generator.returncode = proc.returncode - if proc.returncode: - raise CommandError( - cmd=program, - code=proc.returncode, - stderr=b''.join(stderr_deque)[-chunk_size:], - cwd=cwd, - ) diff --git a/datalad_next/iterable_subprocess/pyproject.toml b/datalad_next/iterable_subprocess/pyproject.toml deleted file mode 100644 index c6e311e0..00000000 --- a/datalad_next/iterable_subprocess/pyproject.toml +++ /dev/null @@ -1,31 +0,0 @@ -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[project] -name = "iterable-subprocess" -version = "0.0.0.dev0" -authors = [ - { name="Department for International Trade", email="sre@digital.trade.gov.uk" }, -] -description = "Python context manager to communicate with a subprocess using iterables of bytes rather Python's built-in subprocess module" -readme = "README.md" -requires-python = ">=3.6.7" -classifiers = [ - 'Programming Language :: Python :: 3', - 'License :: OSI Approved :: MIT License', -] - -[project.optional-dependencies] -dev = [ - "psutil", - "pytest-cov", -] - -[project.urls] -"Source" = "https://github.com/uktrade/iterable-subprocess" - -[tool.hatch.build] -include = [ - "iterable_subprocess.py" -] diff --git a/datalad_next/iterable_subprocess/test_iterable_subprocess.py b/datalad_next/iterable_subprocess/test_iterable_subprocess.py deleted file mode 100644 index b667d2d5..00000000 --- a/datalad_next/iterable_subprocess/test_iterable_subprocess.py +++ /dev/null @@ -1,347 +0,0 @@ -import io -import sys -import threading -import time -import zipfile - -import psutil -import pytest -from threading import Thread - -from .iterable_subprocess import ( - iterable_subprocess, - CommandError, -) - - -def test_cat_not_necessarily_streamed(): - def yield_small_input(): - yield b'first' - yield b'second' - yield b'third' - - with iterable_subprocess(['cat'], yield_small_input()) as output: - assert b''.join(output) == b'firstsecondthird' - - -def test_cat_streamed(): - latest_input = None - - def yield_input(): - nonlocal latest_input - - for i in range(0, 10000000): - yield b'*' * 10 - latest_input = i - - with iterable_subprocess(['cat'], yield_input()) as output: - latest_input_during_output = [latest_input for _ in output] - - # Make sure the input is progressing during the output. In test, there - # are about 915 steps, so checking that it's greater than 50 shouldm't - # make this test too flakey - num_steps = 0 - prev_i = 0 - for i in latest_input_during_output: - if i != prev_i: - num_steps += 1 - prev_i = i - - assert num_steps > 50 - - -def test_process_closed_after(): - # in datalad-next we do not necessarily have no child-processes - # so determine the number of test incrementally - #assert len(psutil.Process().children(recursive=True)) == 0 - n_children = len(psutil.Process().children(recursive=True)) - with iterable_subprocess(['cat'], ()) as output: - assert len(psutil.Process().children(recursive=True)) == (n_children + 1) - assert len(psutil.Process().children(recursive=True)) == n_children - - -def test_exception_from_input_before_yield_propagated(): - def yield_input(): - raise Exception('Something went wrong') - - with pytest.raises(Exception, match='Something went wrong'): - with iterable_subprocess(['cat'], yield_input()) as output: - pass - - -def test_exception_from_input_after_yield_propagated(): - def yield_input(): - yield b'*' - raise Exception('Something went wrong') - - with pytest.raises(Exception, match='Something went wrong'): - with iterable_subprocess(['cat'], yield_input()) as output: - pass - - -def test_exception_from_input_incorrect_type_propagated(): - def yield_input(): - yield 'this-should-be-bytes' - - - with pytest.raises(TypeError): - with iterable_subprocess(['cat'], yield_input()) as output: - pass - - -@pytest.mark.parametrize("size", [ - 1, 100, 10000, 1000000, -]) -def test_exception_from_output_during_input_iterating_propagates_and_does_not_hang(size): - event = threading.Event() - - def yield_input(): - while True: - event.set() - yield b'*' * size - - with pytest.raises(Exception, match='My error'): - with iterable_subprocess(['cat'], yield_input()) as output: - event.wait() - raise Exception('My error') - - -@pytest.mark.parametrize("chunk_size", [ - 1, 100, 10000, 1000000, -]) -@pytest.mark.parametrize("at_iteration", [ - 0, 1, 100, -]) -def test_exception_from_output_iterating_propagates_and_does_not_hang(at_iteration, chunk_size): - def yield_input(): - while True: - yield b'*' * chunk_size - - with pytest.raises(Exception, match='My error'): - with iterable_subprocess(['cat'], yield_input(), chunk_size=chunk_size) as output: - for i, chunk in enumerate(output): - if i == at_iteration: - raise Exception('My error') - - -def test_exception_from_not_found_process_propagated(): - with pytest.raises(FileNotFoundError): - with iterable_subprocess(['does-not-exist'], ()) as output: - b''.join(output) - - -def test_exception_from_return_code(monkeypatch): - monkeypatch.setenv('LANG', 'C') - with pytest.raises(CommandError, match='No such file or directory') as excinfo: - with iterable_subprocess(['ls', 'does-not-exist'], ()) as output: - a = b''.join(output) - - assert excinfo.value.returncode > 0 - assert b'No such file or directory' in excinfo.value.stderr - - -def test_exception_from_context_even_though_return_code_with_long_standard_error(): - with pytest.raises(Exception, match="Another exception"): - with iterable_subprocess([sys.executable, '-c', 'import sys; print("Out"); print("Error message" * 100000, file=sys.stderr); sys.exit(1)'], ()) as output: - for _ in output: - pass - raise Exception('Another exception') - - -def test_exception_from_return_code_with_long_standard_error(): - with pytest.raises(CommandError) as excinfo: - with iterable_subprocess([sys.executable, '-c', 'import sys; print("Out"); print("Error message" * 100000, file=sys.stderr); sys.exit(2)'], ()) as output: - for _ in output: - pass - - assert excinfo.value.returncode == 2 - assert len(excinfo.value.stderr) == 65536 - - -def test_if_process_exits_with_non_zero_error_code_and_inner_exception_it_propagates(): - def yield_input(): - while True: - yield b'*' * 10 - - with pytest.raises(Exception, match='Another exception'): - with iterable_subprocess([ - sys.executable, '-c', 'import sys; print("The error", file=sys.stderr); print("After output"); sys.exit(1)', - ], yield_input()) as output: - all_output = b''.join(output) - raise Exception('Another exception') - - # rstrip to account for different platform line endings here - assert all_output.rstrip() == b'After output' - - - -def test_if_process_closes_standard_input_but_exits_with_non_zero_error_code_then_broken_pipe_error(): - def yield_input(): - while True: - yield b'*' * 10 - - with pytest.raises(BrokenPipeError): - with iterable_subprocess([ - sys.executable, '-c', 'import sys; sys.stdin.close(); print("The error", file=sys.stderr); print("After output"); sys.exit(0)', - ], yield_input()) as output: - all_output = b''.join(output) - - # rstrip to account for different platform line endings here - assert all_output.rstrip() == b'After output' - - -def test_if_process_closes_standard_input_but_exits_with_non_zero_error_code_then_iterable_subprocess_error(): - def yield_input(): - while True: - yield b'*' * 10 - - with pytest.raises(CommandError) as excinfo: - with iterable_subprocess([ - sys.executable, '-c', 'import sys; sys.stdin.close(); print("The error", file=sys.stderr); print("After output"); sys.exit(3)', - ], yield_input()) as output: - all_output = b''.join(output) - - # rstrip to account for different platform line endings here - assert all_output.rstrip() == b'After output' - assert excinfo.value.returncode == 3 - assert excinfo.value.stderr.rstrip()== b'The error' - - -def test_program_that_outputs_for_a_long_time_is_interrupted_on_context_exit(): - start = time.monotonic() - - with pytest.raises(CommandError) as excinfo: - with iterable_subprocess([sys.executable, '-c', 'import time; start = time.monotonic()\nwhile (time.monotonic() - start) < 60:\n print("Output" * 1000)'], ()) as output: - pass - - end = time.monotonic() - - assert excinfo.value.returncode != 0 - # alternative condition reflects error communication on windows (errno22) - assert b'BrokenPipeError' in excinfo.value.stderr or b'Errno 22' in excinfo.value.stderr - assert end - start < 10 - - -def test_program_that_sleeps_exits_quickly_if_exception(): - start = time.monotonic() - - with pytest.raises(Exception, match='From context'): - with iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()) as output: - raise Exception('From context') - - end = time.monotonic() - - assert end - start < 10 - - -def test_program_that_sleeps_exits_quickly_if_keyboard_interrupt(): - start = time.monotonic() - - with pytest.raises(KeyboardInterrupt, match='From context'): - with iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()) as output: - raise KeyboardInterrupt('From context') - - end = time.monotonic() - - assert end - start < 10 - - -def test_program_that_sleeps_exits_quickly_if_keyboard_interrupt_just_before_thread_starts(monkeypatch): - start = time.monotonic() - - def start_that_raises_keyboard_interrupt(self): - raise KeyboardInterrupt('Just before starting thread') - monkeypatch.setattr(Thread, 'start', start_that_raises_keyboard_interrupt) - - with pytest.raises(KeyboardInterrupt, match='Just before starting thread'): - iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()).__enter__() - - end = time.monotonic() - - assert end - start < 10 - - -def test_program_that_sleeps_exits_quickly_if_keyboard_interrupt_just_after_thread_starts(monkeypatch): - start = time.monotonic() - - original_start = Thread.start - def start_that_raises_keyboard_interrupt(self): - original_start(self) - raise KeyboardInterrupt('Just after starting thread') - monkeypatch.setattr(Thread, 'start', start_that_raises_keyboard_interrupt) - - with pytest.raises(KeyboardInterrupt, match='Just after starting thread'): - iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()).__enter__() - - end = time.monotonic() - - assert end - start < 10 - - -def test_program_that_sleeps_not_quickly_if_no_exception(): - start = time.monotonic() - - with iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(2)'], ()) as output: - pass - - end = time.monotonic() - - assert end - start > 2 - - -def test_funzip_no_compression(): - contents = b'*' * 100000 - - def yield_input(): - file = io.BytesIO() - with zipfile.ZipFile(file, 'w', zipfile.ZIP_STORED) as zf: - zf.writestr('any.txt', contents) - - yield file.getvalue() - - with iterable_subprocess(['funzip'], yield_input()) as output: - assert b''.join(output) == contents - - -def test_funzip_deflate(): - contents = b'*' * 100000 - - def yield_input(): - file = io.BytesIO() - with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf: - zf.writestr('any.txt', contents) - - yield file.getvalue() - - with iterable_subprocess(['funzip'], yield_input()) as output: - assert b''.join(output) == contents - - -def test_error_returncode_available_from_generator(): - with pytest.raises(CommandError): - with iterable_subprocess(['ls', 'does-not-exist'], ()) as ls: - tuple(ls) - assert ls.returncode != 0 - - -def test_error_returncode_available_from_generator_with_exception(): - with pytest.raises(StopIteration): - with iterable_subprocess(['ls', 'does-not-exist'], ()) as ls: - while True: - next(ls) - assert ls.returncode != 0 - - -def test_returncode_available_from_generator_with_exception(): - with pytest.raises(StopIteration): - with iterable_subprocess(['echo', 'a'], ()) as echo: - while True: - next(echo) - # On a Linux system, all exceptions that are raised before the subprocess - # exited will lead to a -15 return code. If StopIteration is raised, the - # subprocess will either have terminated which results in a 0-return code, - # or the subprocess is still running and will therefore be terminated which - # results in a -15 return code. Any other exception than StopIteration, - # e.g. a CommandError because echo could not be found, would lead to an - # early test-exit and not proceed to the assign-statement. - assert echo.returncode in (0, -15) diff --git a/datalad_next/runners/iter_subproc.py b/datalad_next/runners/iter_subproc.py index 22256ed8..24154773 100644 --- a/datalad_next/runners/iter_subproc.py +++ b/datalad_next/runners/iter_subproc.py @@ -5,11 +5,9 @@ Iterable, List, ) +from datasalad.runners import CommandError as SaladCommandError +from datasalad.iterable_subprocess import iterable_subprocess -from datalad_next.iterable_subprocess.iterable_subprocess import ( - iterable_subprocess, - OutputFrom, -) from datalad_next.exceptions import CommandError from datalad_next.consts import COPY_BUFSIZE @@ -26,6 +24,11 @@ def iter_subproc( ): """Context manager to communicate with a subprocess using iterables + .. deprecated:: 1.6 + + Use ``datasalad.runners.iter_proc`` instead. Renamed ``input`` argument + to ``inputs``, and raises datalad's ``CommandError``. + This offers a higher level interface to subprocesses than Python's built-in ``subprocess`` module. It allows a subprocess to be naturally placed in a chain of iterables as part of a data processing pipeline. @@ -100,10 +103,20 @@ def iter_subproc( ------- contextmanager """ - return iterable_subprocess( - args, - tuple() if input is None else input, - chunk_size=chunk_size, - cwd=cwd, - bufsize=bufsize, - ) + try: + return iterable_subprocess( + args, + tuple() if input is None else input, + chunk_size=chunk_size, + cwd=cwd, + bufsize=bufsize, + ) + except SaladCommandError as e: + raise CommandError( + cmd=e.cmd, + msg=e.msg, + code=e.returncode, + stdout=e.stdout, + stderr=e.stderr, + cwd=e.cwd, + ) from e diff --git a/datalad_next/runners/tests/test_iter_subproc.py b/datalad_next/runners/tests/test_iter_subproc.py deleted file mode 100644 index ac2602b4..00000000 --- a/datalad_next/runners/tests/test_iter_subproc.py +++ /dev/null @@ -1,33 +0,0 @@ -import pytest -import sys - -from ..iter_subproc import ( - iter_subproc, - CommandError, -) - - -def test_iter_subproc_cwd(tmp_path): - test_content = 'some' - test_file_name = 'testfile' - test_file = tmp_path / test_file_name - test_file.write_text(test_content) - - check_fx = \ - "import sys\n" \ - "if open('{input}').read() == '{content}':\n" \ - " print('okidoki')".format( - input=test_file_name, - content=test_content, - ) - # we cannot read the test file without a full path, because - # CWD is not `tmp_path` - with pytest.raises(CommandError) as e: - with iter_subproc([sys.executable, '-c', check_fx]): - pass - assert 'FileNotFoundError' in e.value - - # but if we make it change to CWD, the same code runs - with iter_subproc([sys.executable, '-c', check_fx], cwd=tmp_path) as proc: - out = b''.join(proc) - assert b'okidoki' in out diff --git a/datalad_next/shell/operations/common.py b/datalad_next/shell/operations/common.py index 4db48ea1..ecf4d045 100644 --- a/datalad_next/shell/operations/common.py +++ b/datalad_next/shell/operations/common.py @@ -1,9 +1,9 @@ from __future__ import annotations from abc import ABCMeta +from datasalad.iterable_subprocess.iterable_subprocess import OutputFrom from logging import getLogger -from datalad_next.runners.iter_subproc import OutputFrom from ..response_generators import ShellCommandResponseGenerator diff --git a/datalad_next/shell/response_generators.py b/datalad_next/shell/response_generators.py index b2e0ce06..15ec987e 100644 --- a/datalad_next/shell/response_generators.py +++ b/datalad_next/shell/response_generators.py @@ -9,8 +9,8 @@ from collections.abc import Generator from random import randint +from datasalad.iterable_subprocess.iterable_subprocess import OutputFrom from datalad_next.itertools import align_pattern -from datalad_next.runners.iter_subproc import OutputFrom __all__ = [ diff --git a/datalad_next/shell/shell.py b/datalad_next/shell/shell.py index 635c511f..90e9df89 100644 --- a/datalad_next/shell/shell.py +++ b/datalad_next/shell/shell.py @@ -9,6 +9,9 @@ import logging from contextlib import contextmanager from dataclasses import dataclass +from datasalad.iterable_subprocess.iterable_subprocess import OutputFrom +from datasalad.runners.iter_subproc import iter_subproc + from queue import Queue from typing import ( Generator, @@ -20,12 +23,9 @@ VariableLengthResponseGenerator, VariableLengthResponseGeneratorPosix, ) + from datalad_next.consts import COPY_BUFSIZE from datalad_next.exceptions import CommandError -from datalad_next.runners.iter_subproc import ( - OutputFrom, - iter_subproc, -) __all__ = [ @@ -54,7 +54,7 @@ def to_exception(self, if isinstance(command, bytes) else str(command), msg=message, - code=self.returncode, + returncode=self.returncode, stdout=self.stdout, stderr=self.stderr, ) @@ -377,7 +377,7 @@ def train(queue: Queue): subprocess_inputs: Queue = Queue() with iter_subproc(shell_cmd, - input=train(subprocess_inputs), + inputs=train(subprocess_inputs), chunk_size=chunk_size, bufsize=0) as shell_output: diff --git a/datalad_next/shell/tests/test_response_generators.py b/datalad_next/shell/tests/test_response_generators.py index 8c5c28c0..1bd581d5 100644 --- a/datalad_next/shell/tests/test_response_generators.py +++ b/datalad_next/shell/tests/test_response_generators.py @@ -4,7 +4,7 @@ import pytest -from datalad_next.runners.iter_subproc import OutputFrom +from datasalad.iterable_subprocess.iterable_subprocess import OutputFrom from ..response_generators import ( FixedLengthResponseGeneratorPosix, VariableLengthResponseGeneratorPosix, diff --git a/datalad_next/shell/tests/test_shell.py b/datalad_next/shell/tests/test_shell.py index a044de3b..d83cb7cc 100644 --- a/datalad_next/shell/tests/test_shell.py +++ b/datalad_next/shell/tests/test_shell.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datasalad.runners import iter_subproc import os import sys from pathlib import PurePosixPath @@ -13,10 +14,7 @@ on_windows, skip_if, ) -from datalad_next.runners import ( - CommandError, - iter_subproc, -) +from datalad_next.exceptions import CommandError from datalad_next.url_operations.ssh import ssh_url2openargs from ..response_generators import ( FixedLengthResponseGeneratorPosix, diff --git a/datalad_next/url_operations/ssh.py b/datalad_next/url_operations/ssh.py index 818a724e..3b4c21a5 100644 --- a/datalad_next/url_operations/ssh.py +++ b/datalad_next/url_operations/ssh.py @@ -5,6 +5,10 @@ import logging import sys +from datasalad.runners import ( + CommandError, + iter_subproc, +) from functools import partial from itertools import chain from pathlib import ( @@ -29,10 +33,6 @@ from datalad_next.consts import COPY_BUFSIZE from datalad_next.config import ConfigManager from datalad_next.itertools import align_pattern -from datalad_next.runners import ( - iter_subproc, - CommandError, -) from .base import UrlOperations @@ -299,7 +299,7 @@ def _perform_upload(self, try: with iter_subproc( cmd, - input=self._with_progress( + inputs=self._with_progress( iter(upload_queue.get, None), progress_id=progress_id, label='uploading', diff --git a/setup.cfg b/setup.cfg index 99ad6bde..1038a3df 100644 --- a/setup.cfg +++ b/setup.cfg @@ -15,6 +15,7 @@ classifiers = python_requires = >= 3.8 install_requires = annexremote + datasalad >= 0.1 datalad >= 0.18.4 humanize more-itertools From 3642879a440756fefccf3ca7b9b56f220268e2c6 Mon Sep 17 00:00:00 2001 From: Christian Monch Date: Tue, 18 Jun 2024 11:53:48 +0200 Subject: [PATCH 2/3] fix(tests): remove align_pattern performance test This commit removes the test `datalad_next.itertools.tests.test_align_pattern.test_performance`. It did not assert any properties, but just print out performance measurements. --- .../itertools/tests/test_align_pattern.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/datalad_next/itertools/tests/test_align_pattern.py b/datalad_next/itertools/tests/test_align_pattern.py index 25a907f9..71aa6f8b 100644 --- a/datalad_next/itertools/tests/test_align_pattern.py +++ b/datalad_next/itertools/tests/test_align_pattern.py @@ -26,25 +26,6 @@ def test_pattern_processor(data_chunks, pattern, expected): assert expected == list(align_pattern(data_chunks, pattern=pattern)) -def test_performance(): - # Ensure that the performance of align_pattern is acceptable for large - # data chunks and patterns. - number = 10 - pattern = b'01234' - data_chunks = [b'a' * 1000 for _ in range(100 * 1000)] + [pattern] - - result_base = timeit.timeit( - lambda: tuple(data_chunks), - number=number, - ) - result_iter = timeit.timeit( - lambda: tuple(align_pattern(data_chunks, pattern=pattern)), - number=number, - ) - - print(result_base, result_iter, result_iter / result_base) - - def test_newline_matches(): pattern = b'----datalad-end-marker-3654137433-rekram-dne-dalatad----\n' chunk1 = b'Have a lot of fun...\n----datalad-end-marker-3654137433-r' From 247ee261433adff8782210c9ff47c8f2f7bf1951 Mon Sep 17 00:00:00 2001 From: Christian Monch Date: Wed, 19 Jun 2024 10:32:33 +0200 Subject: [PATCH 3/3] fix(types): fix type annotations in alig_pattern This is a backport from `datasalad`. --- datalad_next/itertools/align_pattern.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/datalad_next/itertools/align_pattern.py b/datalad_next/itertools/align_pattern.py index 2f72eb25..1796ca4a 100644 --- a/datalad_next/itertools/align_pattern.py +++ b/datalad_next/itertools/align_pattern.py @@ -7,12 +7,15 @@ from typing import ( Generator, Iterable, + TypeVar ) +S = TypeVar('S', str, bytes, bytearray) -def align_pattern(iterable: Iterable[str | bytes | bytearray], - pattern: str | bytes | bytearray - ) -> Generator[str | bytes | bytearray, None, None]: + +def align_pattern(iterable: Iterable[S], + pattern: S + ) -> Generator[S, None, None]: """ Yield data chunks that contain a complete pattern, if it is present ``align_pattern`` makes it easy to find a pattern (``str``, ``bytes``, @@ -61,8 +64,8 @@ def align_pattern(iterable: Iterable[str | bytes | bytearray], Parameters ---------- - iterable: Iterable - An iterable that yields data chunks. + iterable: Iterable[str | bytes | bytearray] + An iterable that yields data chunks pattern: str | bytes | bytearray The pattern that should be contained in the chunks. Its type must be compatible to the type of the elements in ``iterable``. @@ -75,9 +78,11 @@ def align_pattern(iterable: Iterable[str | bytes | bytearray], pattern multiple times. """ - # Create pattern matcher for all + regex: str | bytes | bytearray + + # Create pattern matcher for all prefixes of the pattern if isinstance(pattern, str): - regex: str | bytes | bytearray = '(' + '|'.join( + regex = '(' + '|'.join( '.' * (len(pattern) - index - 1) + re.escape(pattern[:index]) + '$' for index in range(1, len(pattern)) ) + ')' @@ -91,7 +96,7 @@ def align_pattern(iterable: Iterable[str | bytes | bytearray], # Join data chunks until they are sufficiently long to contain the pattern, # i.e. have at least size: `len(pattern)`. Continue joining, if the chunk # ends with a prefix of the pattern. - current_chunk = None + current_chunk: S | None = None for data_chunk in iterable: # get the type of current_chunk from the type of this data_chunk if current_chunk is None: