Skip to content

Commit

Permalink
Merge pull request #596 from christian-monch/ssh-shell
Browse files Browse the repository at this point in the history
Add `PersistentSubShell`-feature
  • Loading branch information
mih authored Apr 18, 2024
2 parents 401af7b + ce445a8 commit 5a12b73
Show file tree
Hide file tree
Showing 17 changed files with 2,010 additions and 43 deletions.
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ source =
[report]
# show lines missing coverage in output
show_missing = True
exclude_also =
raise NotImplementedError
41 changes: 25 additions & 16 deletions datalad_next/iterable_subprocess/iterable_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,30 @@
from datalad_next.exceptions import CommandError


class OutputFrom(Generator):
def __init__(self, stdout, stderr_deque, chunk_size=65536):
self.stdout = stdout
self.stderr_deque = stderr_deque
self.chunk_size = chunk_size
self.returncode = None

def send(self, _):
chunk = self.stdout.read(self.chunk_size)
if not chunk:
raise StopIteration
return chunk

def throw(self, typ, value=None, traceback=None):
return super().throw(typ, value, traceback)


@contextmanager
def iterable_subprocess(
program,
input_chunks,
chunk_size=65536,
cwd=None,
bufsize=-1,
):
# This context starts a thread that populates the subprocess's standard input. It
# also starts a threads that reads the process's standard error. Otherwise we risk
Expand Down Expand Up @@ -105,20 +123,6 @@ def input_to(stdin):
if e.errno != 22:
raise

class OutputFrom(Generator):
def __init__(self, stdout):
self.stdout = stdout
self.returncode = None

def send(self, _):
chunk = self.stdout.read(chunk_size)
if not chunk:
raise StopIteration
return chunk

def throw(self, typ, value=None, traceback=None):
return super().throw(typ, value, traceback)

def keep_only_most_recent(stderr, stderr_deque):
total_length = 0
while True:
Expand All @@ -144,12 +148,13 @@ def raise_if_not_none(exception):
try:

with \
Popen(
Popen( # nosec - all arguments are controlled by the caller
program,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
cwd=cwd,
bufsize=bufsize,
) as proc, \
thread(
keep_only_most_recent,
Expand All @@ -164,7 +169,11 @@ def raise_if_not_none(exception):
try:
start_t_stderr()
start_t_stdin()
chunk_generator = OutputFrom(proc.stdout)
chunk_generator = OutputFrom(
proc.stdout,
stderr_deque,
chunk_size
)
yield chunk_generator
except BaseException:
proc.terminate()
Expand Down
27 changes: 17 additions & 10 deletions datalad_next/itertools/align_pattern.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import re
from typing import (
Generator,
Iterable,
Expand Down Expand Up @@ -74,15 +75,19 @@ def align_pattern(iterable: Iterable[str | bytes | bytearray],
pattern multiple times.
"""

def ends_with_pattern_prefix(data: str | bytes | bytearray,
pattern: str | bytes | bytearray,
) -> bool:
""" Check whether the chunk ends with a prefix of the pattern """
for index in range(len(pattern) - 1, 0, -1):
if data[-index:] == pattern[:index]:
return True
return False

# Create pattern matcher for all
if isinstance(pattern, str):
regex: str | bytes | bytearray = '(' + '|'.join(
'.' * (len(pattern) - index - 1) + re.escape(pattern[:index]) + '$'
for index in range(1, len(pattern))
) + ')'
else:
regex = b'(' + b'|'.join(
b'.' * (len(pattern) - index - 1) + re.escape(pattern[:index]) + b'$'
for index in range(1, len(pattern))
) + b')'
pattern_matcher = re.compile(regex, re.DOTALL)
pattern_sub = len(pattern) - 1
# Join data chunks until they are sufficiently long to contain the pattern,
# i.e. have at least size: `len(pattern)`. Continue joining, if the chunk
# ends with a prefix of the pattern.
Expand All @@ -94,7 +99,9 @@ def ends_with_pattern_prefix(data: str | bytes | bytearray,
else:
current_chunk += data_chunk
if len(current_chunk) >= len(pattern) \
and not ends_with_pattern_prefix(current_chunk, pattern):
and not (
current_chunk[-1] in pattern
and pattern_matcher.match(current_chunk, len(current_chunk) - pattern_sub)):
yield current_chunk
current_chunk = None

Expand Down
30 changes: 30 additions & 0 deletions datalad_next/itertools/tests/test_align_pattern.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import timeit

import pytest

from ..align_pattern import align_pattern
Expand All @@ -22,3 +24,31 @@
])
def test_pattern_processor(data_chunks, pattern, expected):
assert expected == list(align_pattern(data_chunks, pattern=pattern))


def test_performance():
# Ensure that the performance of align_pattern is acceptable for large
# data chunks and patterns.
number = 10
pattern = b'01234'
data_chunks = [b'a' * 1000 for _ in range(100 * 1000)] + [pattern]

result_base = timeit.timeit(
lambda: tuple(data_chunks),
number=number,
)
result_iter = timeit.timeit(
lambda: tuple(align_pattern(data_chunks, pattern=pattern)),
number=number,
)

print(result_base, result_iter, result_iter / result_base)


def test_newline_matches():
pattern = b'----datalad-end-marker-3654137433-rekram-dne-dalatad----\n'
chunk1 = b'Have a lot of fun...\n----datalad-end-marker-3654137433-r'
chunk2 = b'e'
chunk3 = b'kram-dne-dalatad----\n'
result = list(align_pattern([chunk1, chunk2, chunk3], pattern))
assert result == [chunk1 + chunk2 + chunk3]
10 changes: 9 additions & 1 deletion datalad_next/runners/iter_subproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
List,
)

from datalad_next.iterable_subprocess import iterable_subprocess
from datalad_next.iterable_subprocess.iterable_subprocess import (
iterable_subprocess,
OutputFrom,
)
from datalad_next.exceptions import CommandError
from datalad_next.consts import COPY_BUFSIZE

Expand All @@ -19,6 +22,7 @@ def iter_subproc(
input: Iterable[bytes] | None = None,
chunk_size: int = COPY_BUFSIZE,
cwd: Path | None = None,
bufsize: int = -1,
):
"""Context manager to communicate with a subprocess using iterables
Expand Down Expand Up @@ -88,6 +92,9 @@ def iter_subproc(
Size of chunks to read from the subprocess's stdout/stderr in bytes.
cwd: Path
Working directory for the subprocess, passed to ``subprocess.Popen``.
bufsize: int, optional
Buffer size to use for the subprocess's ``stdin``, ``stdout``, and
``stderr``. See ``subprocess.Popen`` for details.
Returns
-------
Expand All @@ -98,4 +105,5 @@ def iter_subproc(
tuple() if input is None else input,
chunk_size=chunk_size,
cwd=cwd,
bufsize=bufsize,
)
171 changes: 171 additions & 0 deletions datalad_next/shell/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""A persistent shell connection
This module provides a context manager that establishes a connection to a shell
and can be used to execute multiple commands in that shell. Shells are usually
remote shells, e.g. connected via an ``ssh``-client, but local shells like
``zsh``, ``bash`` or ``PowerShell`` can also be used.
The context manager returns an instance of :class:`ShellCommandExecutor` that
can be used to execute commands in the shell via the method
:meth:`ShellCommandExecutor.__call__`. The method will return an instance of
a subclass of :class:`ShellCommandResponseGenerator` that can be used to
retrieve the output of the command, the result code of the command, and the
stderr-output of the command.
Every response generator expects a certain output structure. It is responsible
for ensuring that the output structure is generated. To this end every
response generator provides a method
:meth:`ShellCommandResponseGenerator.get_command_list`. The method
:class:`ShellCommandExecutor.__call__` will pass the user-provided command to
:meth:`ShellCommandResponseGenerator.get_command_list` and receive a list of
final commands that should be executed in the connected shell and that will
generate the expected output structure. Instances of
:class:`ShellCommandResponseGenerator` have therefore four tasks:
1. Create a final command list that is used to execute the user provided
command. This could, for example, execute the command, print an
end marker, and print the return code of the command.
2. Parse the output of the command, yield it to the user.
3. Read the return code and provide it to the user.
4. Provide stderr-output to the user.
A very versatile example of a response generator is the class
:class:`VariableLengthResponseGenerator`. It can be used to execute a command
that will result in an output of unknown length, e.g. ``ls``, and will yield
the output of the command to the user. It does that by using a random
*end marker* to detect the end of the output and read the trailing return code.
This is suitable for almost all commands.
If :class:`VariableLengthResponseGenerator` is so versatile, why not just
implement its functionality in :class:`ShellCommandExecutor`? There are two
major reasons for that:
1. Although the :class:`VariableLengthResponseGenerator` is very versatile,
it is not the most efficient implementation for commands that produce large
amounts of output. In addition, there is also a minimal risk that the end
marker is part of the output of the command, which would trip up the response
generator. Putting response generation into a separate class allows to
implement specific operations more efficiently and more safely.
For example,
:class:`DownloadResponseGenerator` implements the download of files. It
takes a remote file name as user "command" and creates a final command list
that emits the length of the file, a newline, the file content, a return
code, and a newline. This allows :class:`DownloadResponseGenerator`
to parse the output without relying on an end marker, thus increasing
efficiency and safety
2. Factoring out the response generation creates an interface that can be used
to support the syntax of different shells and the difference in command
names and options in different operating systems. For example, the response
generator class :class:`VariableLengthResponseGeneratorPowerShell` supports
the invocation of commands with variable length output in a ``PowerShell``.
In short, parser generator classes encapsulate details of shell-syntax and
operation implementation. That allows support of different shell syntax, and
the efficient implementation of specific higher level operations, e.g.
``download``. It also allows users to extend the functionality of
:class:`ShellCommandExecutor` by providing their own response generator
classes.
The module :mod:`datalad_next.shell.response_generators` provides two generally
applicable abstract response generator classes:
- :class:`VariableLengthResponseGenerator`
- :class:`FixedLengthResponseGenerator`
The functionality of the former is described above. The latter can be used to
execute a command that will result in output of known
length, e.g. ``echo -n 012345``. It reads the specified number of bytes and a
trailing return code. This is more performant than the variable length response
generator (because it does not have to search for the end marker). In addition,
it does not rely on the uniqueness of the end marker. It is most useful for
operation like ``download``, where the length of the output can be known in
advance.
As mentioned above, the classes :class:`VariableLengthResponseGenerator` and
:class:`FixedLengthResponseGenerator` are abstract. The module
:mod:`datalad_next.shell.response_generators` provides the following concrete
implementations for them:
- :class:`VariableLengthResponseGeneratorPosix`
- :class:`VariableLengthResponseGeneratorPowerShell`
- :class:`FixedLengthResponseGeneratorPosix`
- :class:`FixedLengthResponseGeneratorPowerShell`
When :func:`shell` is executed it will use a
:class:`VariableLengthResponseClass` to skip the login message of the shell.
This is done by executing a *zero command* (a command that will possibly
generate some output, and successfully return) in the shell. The zero command is
provided by the concrete implementation of class
:class:`VariableLengthResponseGenerator`. For example, the zero command for
POSIX shells is ``test 0 -eq 0``, for PowerShell it is ``Write-Host hello``.
Because there is no way for func:`shell` to determine the kind of shell it
connects to, the user can provide an alternative response generator class, in
the ``zero_command_rg_class``-parameter. Instance of that class
will then be used to execute the zero command. Currently, the following two
response generator classes are available:
- :class:`VariableLengthResponseGeneratorPosix`: works with POSIX-compliant
shells, e.g. ``sh`` or ``bash``. This is the default.
- :class:`VariableLengthResponseGeneratorPowerShell`: works with PowerShell.
Whenever a command is executed via :meth:`ShellCommandExecutor.__call__`, the
class identified by ``zero_command_rg_class`` will be used by default to create
the final command list and to parse the result. Users can override this on a
per-call basis by providing a different response generator class in the
``response_generator``-parameter of :meth:`ShellCommandExecutor.__call__`.
.. currentmodule:: datalad_next.shell
.. autosummary::
:toctree: generated
:recursive:
ShellCommandExecutor
ShellCommandResponseGenerator
VariableLengthResponseGenerator
VariableLengthResponseGeneratorPosix
VariableLengthResponseGeneratorPowerShell
FixedLengthResponseGenerator
FixedLengthResponseGeneratorPosix
FixedLengthResponseGeneratorPowerShell
DownloadResponseGenerator
DownloadResponseGeneratorPosix
operations.posix.upload
operations.posix.download
operations.posix.delete
"""


__all__ = [
'shell',
'posix',
]

from .shell import (
shell,
ShellCommandExecutor,
)

from .operations import posix
from .operations.posix import (
DownloadResponseGenerator,
DownloadResponseGeneratorPosix,
)
from .response_generators import (
FixedLengthResponseGenerator,
FixedLengthResponseGeneratorPosix,
FixedLengthResponseGeneratorPowerShell,
ShellCommandResponseGenerator,
VariableLengthResponseGenerator,
VariableLengthResponseGeneratorPosix,
VariableLengthResponseGeneratorPowerShell,
)
Empty file.
Loading

0 comments on commit 5a12b73

Please sign in to comment.