Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PersistentSubShell-feature #596

Merged
merged 30 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4261fdf
add `bufsize` parameter to iterable subprocesses
christian-monch Feb 26, 2024
f592614
improve `align_pattern` performance
christian-monch Feb 26, 2024
025c18a
add `datalad_next.shell.shell` context manager
christian-monch Feb 26, 2024
68c22af
[temp] add safe_read for upload
christian-monch Mar 7, 2024
d57f533
add `safe_read` wrapper to fix a bug
christian-monch Mar 8, 2024
45c5fac
add progress callbacks for download/upload
christian-monch Mar 11, 2024
81d8554
update docstring
christian-monch Mar 12, 2024
6cb2b7c
raise CommandError if shell commands fail
christian-monch Mar 12, 2024
2b6917c
fix type definitions
christian-monch Mar 13, 2024
c5c776e
increase test coverage of shell-code
christian-monch Mar 13, 2024
c1b964a
support `str`-commands in ShellCommandExecutor
christian-monch Mar 13, 2024
ee4bdf5
clean up code and improve names
christian-monch Mar 14, 2024
950ec40
fix an error in `str`-command handling
christian-monch Mar 14, 2024
f005419
fixed a typo
christian-monch Mar 14, 2024
b849323
use a queue to signal completion in `posix.upload`
christian-monch Mar 15, 2024
1f34132
replace `stat` with `ls` in download-op
christian-monch Mar 18, 2024
620bbe1
fix a comment
christian-monch Mar 26, 2024
028e752
add a -currently unused- credential argument
christian-monch Mar 26, 2024
9d4ae1a
fix information in comments
christian-monch Apr 13, 2024
d006050
address reviewer comments
christian-monch Apr 17, 2024
1a99586
update examples in docstrings
christian-monch Apr 17, 2024
ba8985e
update documentation
christian-monch Apr 17, 2024
e68aa0d
improve doc-strings, remove unused code
christian-monch Apr 17, 2024
7b5635e
Employ `shell` feature in SSH test fixture
mih Apr 17, 2024
c1a2dcf
adapt mih's changes to changed code
christian-monch Apr 17, 2024
2117d65
fix a typo
christian-monch Apr 17, 2024
04b4140
fix escape in docstrings
christian-monch Apr 17, 2024
bcb39b8
fix type issues and a comment
christian-monch Apr 17, 2024
79436f8
fix windows tests
christian-monch Apr 17, 2024
ce445a8
Update datalad_next/shell/operations/posix.py
christian-monch Apr 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ source =
[report]
# show lines missing coverage in output
show_missing = True
exclude_also =
raise NotImplementedError
41 changes: 25 additions & 16 deletions datalad_next/iterable_subprocess/iterable_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,30 @@
from datalad_next.exceptions import CommandError


class OutputFrom(Generator):
def __init__(self, stdout, stderr_deque, chunk_size=65536):
self.stdout = stdout
self.stderr_deque = stderr_deque
self.chunk_size = chunk_size
self.returncode = None

def send(self, _):
chunk = self.stdout.read(self.chunk_size)
if not chunk:
raise StopIteration
return chunk

def throw(self, typ, value=None, traceback=None):
return super().throw(typ, value, traceback)


@contextmanager
def iterable_subprocess(
program,
input_chunks,
chunk_size=65536,
cwd=None,
bufsize=-1,
):
# This context starts a thread that populates the subprocess's standard input. It
# also starts a threads that reads the process's standard error. Otherwise we risk
Expand Down Expand Up @@ -105,20 +123,6 @@ def input_to(stdin):
if e.errno != 22:
raise

class OutputFrom(Generator):
def __init__(self, stdout):
self.stdout = stdout
self.returncode = None

def send(self, _):
chunk = self.stdout.read(chunk_size)
if not chunk:
raise StopIteration
return chunk

def throw(self, typ, value=None, traceback=None):
return super().throw(typ, value, traceback)

def keep_only_most_recent(stderr, stderr_deque):
total_length = 0
while True:
Expand All @@ -144,12 +148,13 @@ def raise_if_not_none(exception):
try:

with \
Popen(
Popen( # nosec - all arguments are controlled by the caller
program,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
cwd=cwd,
bufsize=bufsize,
) as proc, \
thread(
keep_only_most_recent,
Expand All @@ -164,7 +169,11 @@ def raise_if_not_none(exception):
try:
start_t_stderr()
start_t_stdin()
chunk_generator = OutputFrom(proc.stdout)
chunk_generator = OutputFrom(
proc.stdout,
stderr_deque,
chunk_size
)
yield chunk_generator
except BaseException:
proc.terminate()
Expand Down
27 changes: 17 additions & 10 deletions datalad_next/itertools/align_pattern.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import re
from typing import (
Generator,
Iterable,
Expand Down Expand Up @@ -74,15 +75,19 @@ def align_pattern(iterable: Iterable[str | bytes | bytearray],
pattern multiple times.
"""

def ends_with_pattern_prefix(data: str | bytes | bytearray,
pattern: str | bytes | bytearray,
) -> bool:
""" Check whether the chunk ends with a prefix of the pattern """
for index in range(len(pattern) - 1, 0, -1):
if data[-index:] == pattern[:index]:
return True
return False

# Create pattern matcher for all
if isinstance(pattern, str):
regex: str | bytes | bytearray = '(' + '|'.join(
'.' * (len(pattern) - index - 1) + re.escape(pattern[:index]) + '$'
for index in range(1, len(pattern))
) + ')'
else:
regex = b'(' + b'|'.join(
b'.' * (len(pattern) - index - 1) + re.escape(pattern[:index]) + b'$'
for index in range(1, len(pattern))
) + b')'
pattern_matcher = re.compile(regex, re.DOTALL)
pattern_sub = len(pattern) - 1
# Join data chunks until they are sufficiently long to contain the pattern,
# i.e. have at least size: `len(pattern)`. Continue joining, if the chunk
# ends with a prefix of the pattern.
Expand All @@ -94,7 +99,9 @@ def ends_with_pattern_prefix(data: str | bytes | bytearray,
else:
current_chunk += data_chunk
if len(current_chunk) >= len(pattern) \
and not ends_with_pattern_prefix(current_chunk, pattern):
and not (
current_chunk[-1] in pattern
and pattern_matcher.match(current_chunk, len(current_chunk) - pattern_sub)):
yield current_chunk
current_chunk = None

Expand Down
30 changes: 30 additions & 0 deletions datalad_next/itertools/tests/test_align_pattern.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import timeit

import pytest

from ..align_pattern import align_pattern
Expand All @@ -22,3 +24,31 @@
])
def test_pattern_processor(data_chunks, pattern, expected):
assert expected == list(align_pattern(data_chunks, pattern=pattern))


def test_performance():
# Ensure that the performance of align_pattern is acceptable for large
# data chunks and patterns.
number = 10
pattern = b'01234'
data_chunks = [b'a' * 1000 for _ in range(100 * 1000)] + [pattern]

result_base = timeit.timeit(
lambda: tuple(data_chunks),
number=number,
)
result_iter = timeit.timeit(
lambda: tuple(align_pattern(data_chunks, pattern=pattern)),
number=number,
)

print(result_base, result_iter, result_iter / result_base)


def test_newline_matches():
pattern = b'----datalad-end-marker-3654137433-rekram-dne-dalatad----\n'
chunk1 = b'Have a lot of fun...\n----datalad-end-marker-3654137433-r'
chunk2 = b'e'
chunk3 = b'kram-dne-dalatad----\n'
result = list(align_pattern([chunk1, chunk2, chunk3], pattern))
assert result == [chunk1 + chunk2 + chunk3]
10 changes: 9 additions & 1 deletion datalad_next/runners/iter_subproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
List,
)

from datalad_next.iterable_subprocess import iterable_subprocess
from datalad_next.iterable_subprocess.iterable_subprocess import (
iterable_subprocess,
OutputFrom,
)
from datalad_next.exceptions import CommandError
from datalad_next.consts import COPY_BUFSIZE

Expand All @@ -19,6 +22,7 @@ def iter_subproc(
input: Iterable[bytes] | None = None,
chunk_size: int = COPY_BUFSIZE,
cwd: Path | None = None,
bufsize: int = -1,
):
"""Context manager to communicate with a subprocess using iterables

Expand Down Expand Up @@ -88,6 +92,9 @@ def iter_subproc(
Size of chunks to read from the subprocess's stdout/stderr in bytes.
cwd: Path
Working directory for the subprocess, passed to ``subprocess.Popen``.
bufsize: int, optional
Buffer size to use for the subprocess's ``stdin``, ``stdout``, and
``stderr``. See ``subprocess.Popen`` for details.

Returns
-------
Expand All @@ -98,4 +105,5 @@ def iter_subproc(
tuple() if input is None else input,
chunk_size=chunk_size,
cwd=cwd,
bufsize=bufsize,
)
171 changes: 171 additions & 0 deletions datalad_next/shell/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""A persistent shell connection

This module provides a context manager that establishes a connection to a shell
and can be used to execute multiple commands in that shell. Shells are usually
remote shells, e.g. connected via an ``ssh``-client, but local shells like
``zsh``, ``bash`` or ``PowerShell`` can also be used.

The context manager returns an instance of :class:`ShellCommandExecutor` that
can be used to execute commands in the shell via the method
:meth:`ShellCommandExecutor.__call__`. The method will return an instance of
a subclass of :class:`ShellCommandResponseGenerator` that can be used to
retrieve the output of the command, the result code of the command, and the
stderr-output of the command.

Every response generator expects a certain output structure. It is responsible
for ensuring that the output structure is generated. To this end every
response generator provides a method
:meth:`ShellCommandResponseGenerator.get_command_list`. The method
:class:`ShellCommandExecutor.__call__` will pass the user-provided command to
:meth:`ShellCommandResponseGenerator.get_command_list` and receive a list of
final commands that should be executed in the connected shell and that will
generate the expected output structure. Instances of
:class:`ShellCommandResponseGenerator` have therefore four tasks:

1. Create a final command list that is used to execute the user provided
command. This could, for example, execute the command, print an
end marker, and print the return code of the command.

2. Parse the output of the command, yield it to the user.

3. Read the return code and provide it to the user.

4. Provide stderr-output to the user.

A very versatile example of a response generator is the class
:class:`VariableLengthResponseGenerator`. It can be used to execute a command
that will result in an output of unknown length, e.g. ``ls``, and will yield
the output of the command to the user. It does that by using a random
*end marker* to detect the end of the output and read the trailing return code.
This is suitable for almost all commands.

If :class:`VariableLengthResponseGenerator` is so versatile, why not just
implement its functionality in :class:`ShellCommandExecutor`? There are two
major reasons for that:

1. Although the :class:`VariableLengthResponseGenerator` is very versatile,
it is not the most efficient implementation for commands that produce large
amounts of output. In addition, there is also a minimal risk that the end
marker is part of the output of the command, which would trip up the response
generator. Putting response generation into a separate class allows to
implement specific operations more efficiently and more safely.
For example,
:class:`DownloadResponseGenerator` implements the download of files. It
takes a remote file name as user "command" and creates a final command list
that emits the length of the file, a newline, the file content, a return
code, and a newline. This allows :class:`DownloadResponseGenerator`
to parse the output without relying on an end marker, thus increasing
efficiency and safety

2. Factoring out the response generation creates an interface that can be used
to support the syntax of different shells and the difference in command
names and options in different operating systems. For example, the response
generator class :class:`VariableLengthResponseGeneratorPowerShell` supports
the invocation of commands with variable length output in a ``PowerShell``.

In short, parser generator classes encapsulate details of shell-syntax and
operation implementation. That allows support of different shell syntax, and
the efficient implementation of specific higher level operations, e.g.
``download``. It also allows users to extend the functionality of
:class:`ShellCommandExecutor` by providing their own response generator
classes.

The module :mod:`datalad_next.shell.response_generators` provides two generally
applicable abstract response generator classes:

- :class:`VariableLengthResponseGenerator`

- :class:`FixedLengthResponseGenerator`

The functionality of the former is described above. The latter can be used to
execute a command that will result in output of known
length, e.g. ``echo -n 012345``. It reads the specified number of bytes and a
trailing return code. This is more performant than the variable length response
generator (because it does not have to search for the end marker). In addition,
it does not rely on the uniqueness of the end marker. It is most useful for
operation like ``download``, where the length of the output can be known in
advance.

As mentioned above, the classes :class:`VariableLengthResponseGenerator` and
:class:`FixedLengthResponseGenerator` are abstract. The module
:mod:`datalad_next.shell.response_generators` provides the following concrete
implementations for them:

- :class:`VariableLengthResponseGeneratorPosix`

- :class:`VariableLengthResponseGeneratorPowerShell`

- :class:`FixedLengthResponseGeneratorPosix`

- :class:`FixedLengthResponseGeneratorPowerShell`

When :func:`shell` is executed it will use a
:class:`VariableLengthResponseClass` to skip the login message of the shell.
This is done by executing a *zero command* (a command that will possibly
generate some output, and successfully return) in the shell. The zero command is
provided by the concrete implementation of class
:class:`VariableLengthResponseGenerator`. For example, the zero command for
POSIX shells is ``test 0 -eq 0``, for PowerShell it is ``Write-Host hello``.

Because there is no way for func:`shell` to determine the kind of shell it
connects to, the user can provide an alternative response generator class, in
the ``zero_command_rg_class``-parameter. Instance of that class
will then be used to execute the zero command. Currently, the following two
response generator classes are available:

- :class:`VariableLengthResponseGeneratorPosix`: works with POSIX-compliant
shells, e.g. ``sh`` or ``bash``. This is the default.
- :class:`VariableLengthResponseGeneratorPowerShell`: works with PowerShell.

Whenever a command is executed via :meth:`ShellCommandExecutor.__call__`, the
class identified by ``zero_command_rg_class`` will be used by default to create
the final command list and to parse the result. Users can override this on a
per-call basis by providing a different response generator class in the
``response_generator``-parameter of :meth:`ShellCommandExecutor.__call__`.

.. currentmodule:: datalad_next.shell

.. autosummary::
:toctree: generated
:recursive:

ShellCommandExecutor
ShellCommandResponseGenerator
VariableLengthResponseGenerator
VariableLengthResponseGeneratorPosix
VariableLengthResponseGeneratorPowerShell
FixedLengthResponseGenerator
FixedLengthResponseGeneratorPosix
FixedLengthResponseGeneratorPowerShell
DownloadResponseGenerator
DownloadResponseGeneratorPosix
operations.posix.upload
operations.posix.download
operations.posix.delete
"""


__all__ = [
'shell',
'posix',
]

from .shell import (
shell,
ShellCommandExecutor,
)

from .operations import posix
from .operations.posix import (
DownloadResponseGenerator,
DownloadResponseGeneratorPosix,
)
from .response_generators import (
FixedLengthResponseGenerator,
FixedLengthResponseGeneratorPosix,
FixedLengthResponseGeneratorPowerShell,
ShellCommandResponseGenerator,
VariableLengthResponseGenerator,
VariableLengthResponseGeneratorPosix,
VariableLengthResponseGeneratorPowerShell,
)
Comment on lines +158 to +171
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a common naming/selection strategy?

Client code must choose the current implementation for there use case, but it seems a simple label for a shell-setup is not sufficient. We have a posix operations module, but no one for response generators.

Why not have one posix module and it has all pieces needed?

Along the same lines, I think we should also avoid that client code has to switch class names per use case. Now thye need to select a module to import from and what to import also. This means that client code also needs to maintain a mapping of the API. If we would provide modules with the exact same API, client code could be simplified to just get the label of the "platform/shell-type" module to work with.

I also want to point out that the promise is that everything imported from datalad_next.shell is stable (or needs a minor/major release to change). We import the entire posix module, which not only has the pieces that aim to be a "public" API. I think this calls for a dedicated module that exposes the API.

Empty file.
Loading
Loading