Skip to content

Commit

Permalink
issue #406: clean up DiagLogStream handling and connect() failure.
Browse files Browse the repository at this point in the history
When Stream.connect() fails, have it just use on_disconnect(). Now there
is a single disconnect cleanup path.

Remove cutpasted DiagLogStream setup/destruction, and move it into the
base class (temporarily), and only manage the lifetime of its underlying
FD via Side.close().  This cures another EBADF failure.
  • Loading branch information
dw committed Nov 4, 2018
1 parent e01c8f2 commit 802de6a
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 73 deletions.
18 changes: 4 additions & 14 deletions mitogen/doas.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False

#: Once connected, points to the corresponding DiagLogStream, allowing it
#: to be disconnected at the same time this stream is being torn down.
tty_stream = None

username = 'root'
password = None
doas_path = 'doas'
Expand All @@ -75,10 +71,6 @@ def connect(self):
super(Stream, self).connect()
self.name = u'doas.' + mitogen.core.to_text(self.username)

def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)

def get_boot_command(self):
bits = [self.doas_path, '-u', self.username, '--']
bits = bits + super(Stream, self).get_boot_command()
Expand All @@ -88,15 +80,13 @@ def get_boot_command(self):
password_incorrect_msg = 'doas password is incorrect'
password_required_msg = 'doas password is required'

def _connect_bootstrap(self, extra_fd):
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self)

password_sent = False
def _connect_bootstrap(self):
it = mitogen.parent.iter_read(
fds=[self.receive_side.fd, extra_fd],
fds=[self.receive_side.fd, self.diag_stream.receive_side.fd],
deadline=self.connect_deadline,
)

password_sent = False
for buf in it:
LOG.debug('%r: received %r', self, buf)
if buf.endswith(self.EC0_MARKER):
Expand All @@ -111,7 +101,7 @@ def _connect_bootstrap(self, extra_fd):
if password_sent:
raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password')
self.tty_stream.transmit_side.write(
self.diag_stream.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8')
)
password_sent = True
Expand Down
2 changes: 1 addition & 1 deletion mitogen/fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,6 @@ def _child_main(self, childfp):
# Don't trigger atexit handlers, they were copied from the parent.
os._exit(0)

def _connect_bootstrap(self, extra_fd):
def _connect_bootstrap(self):
# None required.
pass
70 changes: 45 additions & 25 deletions mitogen/parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,33 @@ class Stream(mitogen.core.Stream):
#: ExternalContext.main().
max_message_size = None

#: If :attr:`create_child` supplied a diag_fd, references the corresponding
#: :class:`DiagLogStream`, allowing it to be disconnected when this stream
#: is disconnected. Set to :data:`None` if no `diag_fd` was present.
diag_stream = None

#: Function with the semantics of :func:`create_child` used to create the
#: child process.
create_child = staticmethod(create_child)

#: Dictionary of extra kwargs passed to :attr:`create_child`.
create_child_args = {}

#: :data:`True` if the remote has indicated that it intends to detach, and
#: should not be killed on disconnect.
detached = False

#: If :data:`True`, indicates the child should not be killed during
#: graceful detachment, as it the actual process implementing the child
#: context. In all other cases, the subprocess is SSH, sudo, or a similar
#: tool that should be reminded to quit during disconnection.
child_is_immediate_subprocess = True

#: Prefix given to default names generated by :meth:`connect`.
name_prefix = u'local'

_reaped = False

def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core'])
Expand Down Expand Up @@ -976,15 +1003,6 @@ def on_shutdown(self, broker):
)
)

#: If :data:`True`, indicates the subprocess managed by us should not be
#: killed during graceful detachment, as it the actual process implementing
#: the child context. In all other cases, the subprocess is SSH, sudo, or a
#: similar tool that should be reminded to quit during disconnection.
child_is_immediate_subprocess = True

detached = False
_reaped = False

def _reap_child(self):
"""
Reap the child process during disconnection.
Expand Down Expand Up @@ -1024,8 +1042,10 @@ def _reap_child(self):
raise

def on_disconnect(self, broker):
self._reap_child()
super(Stream, self).on_disconnect(broker)
if self.diag_stream is not None:
self.diag_stream.on_disconnect(broker)
self._reap_child()

# Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups
# file descriptor 0 as 100, creates a pipe, then execs a new interpreter
Expand Down Expand Up @@ -1129,10 +1149,6 @@ def get_preamble(self):
)
return zlib.compress(source.encode('utf-8'), 9)

create_child = staticmethod(create_child)
create_child_args = {}
name_prefix = u'local'

def start_child(self):
args = self.get_boot_command()
try:
Expand All @@ -1154,26 +1170,28 @@ def _adorn_eof_error(self, e):

def connect(self):
LOG.debug('%r.connect()', self)
self.pid, fd, extra_fd = self.start_child()
self.pid, fd, diag_fd = self.start_child()
self.name = u'%s.%s' % (self.name_prefix, self.pid)
self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r',
self, self.receive_side.fd)
if diag_fd is not None:
self.diag_stream = DiagLogStream(diag_fd, self)
else:
self.diag_stream = None

LOG.debug('%r.connect(): stdin=%r, stdout=%r, diag=%r',
self, self.receive_side.fd, self.transmit_side.fd,
self.diag_stream and self.diag_stream.receive_side.fd)

try:
self._connect_bootstrap(extra_fd)
self._connect_bootstrap()
except EofError:
self.receive_side.close()
self.transmit_side.close()
self.on_disconnect(self._router.broker)
e = sys.exc_info()[1]
self._adorn_eof_error(e)
raise
except Exception:
self.receive_side.close()
self.transmit_side.close()
if extra_fd is not None:
os.close(extra_fd)
self.on_disconnect(self._router.broker)
self._reap_child()
raise

Expand All @@ -1188,8 +1206,10 @@ def _ec0_received(self):
write_all(self.transmit_side.fd, self.get_preamble())
discard_until(self.receive_side.fd, self.EC1_MARKER,
self.connect_deadline)
if self.diag_stream:
self._router.broker.start_receive(self.diag_stream)

def _connect_bootstrap(self, extra_fd):
def _connect_bootstrap(self):
discard_until(self.receive_side.fd, self.EC0_MARKER,
self.connect_deadline)
self._ec0_received()
Expand Down
25 changes: 5 additions & 20 deletions mitogen/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ class Stream(mitogen.parent.Stream):
#: Number of -v invocations to pass on command line.
ssh_debug_level = 0

#: If batch_mode=False, points to the corresponding DiagLogStream, allowing
#: it to be disconnected at the same time this stream is being torn down.
tty_stream = None

#: The path to the SSH binary.
ssh_path = 'ssh'

Expand Down Expand Up @@ -195,11 +191,6 @@ def _init_create_child(self):
'stderr_pipe': True,
}

def on_disconnect(self, broker):
if self.tty_stream is not None:
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)

def get_boot_command(self):
bits = [self.ssh_path]
if self.ssh_debug_level:
Expand Down Expand Up @@ -265,24 +256,18 @@ def connect(self):
def _host_key_prompt(self):
if self.check_host_keys == 'accept':
LOG.debug('%r: accepting host key', self)
self.tty_stream.transmit_side.write(b('yes\n'))
self.diag_stream.transmit_side.write(b('yes\n'))
return

# _host_key_prompt() should never be reached with ignore or enforce
# mode, SSH should have handled that. User's ssh_args= is conflicting
# with ours.
raise HostKeyError(self.hostkey_config_msg)

def _ec0_received(self):
if self.tty_stream is not None:
self._router.broker.start_receive(self.tty_stream)
return super(Stream, self)._ec0_received()

def _connect_bootstrap(self, extra_fd):
def _connect_bootstrap(self):
fds = [self.receive_side.fd]
if extra_fd is not None:
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self)
fds.append(extra_fd)
if self.diag_stream is not None:
fds.append(self.diag_stream.receive_side.fd)

it = mitogen.parent.iter_read(fds=fds, deadline=self.connect_deadline)

Expand Down Expand Up @@ -311,7 +296,7 @@ def _connect_bootstrap(self, extra_fd):
if self.password is None:
raise PasswordError(self.password_required_msg)
LOG.debug('%r: sending password', self)
self.tty_stream.transmit_side.write(
self.diag_stream.transmit_side.write(
(self.password + '\n').encode()
)
password_sent = True
Expand Down
5 changes: 1 addition & 4 deletions mitogen/su.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,14 @@ def connect(self):
super(Stream, self).connect()
self.name = u'su.' + mitogen.core.to_text(self.username)

def on_disconnect(self, broker):
super(Stream, self).on_disconnect(broker)

def get_boot_command(self):
argv = mitogen.parent.Argv(super(Stream, self).get_boot_command())
return [self.su_path, self.username, '-c', str(argv)]

password_incorrect_msg = 'su password is incorrect'
password_required_msg = 'su password is required'

def _connect_bootstrap(self, extra_fd):
def _connect_bootstrap(self):
password_sent = False
it = mitogen.parent.iter_read(
fds=[self.receive_side.fd],
Expand Down
12 changes: 5 additions & 7 deletions mitogen/sudo.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,6 @@ def connect(self):
super(Stream, self).connect()
self.name = u'sudo.' + mitogen.core.to_text(self.username)

def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)

def get_boot_command(self):
# Note: sudo did not introduce long-format option processing until July
# 2013, so even though we parse long-format options, supply short-form
Expand All @@ -177,12 +173,14 @@ def get_boot_command(self):
password_incorrect_msg = 'sudo password is incorrect'
password_required_msg = 'sudo password is required'

def _connect_bootstrap(self, extra_fd):
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self)
def _connect_bootstrap(self):
fds = [self.receive_side.fd]
if self.diag_stream is not None:
fds.append(self.diag_stream.receive_side.fd)

password_sent = False
it = mitogen.parent.iter_read(
fds=[self.receive_side.fd, extra_fd],
fds=fds,
deadline=self.connect_deadline,
)

Expand Down
7 changes: 6 additions & 1 deletion tests/data/stubs/stub-doas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

import json
import os
import subprocess
import sys

os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv)
os.environ['THIS_IS_STUB_DOAS'] = '1'
os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:])

# This must be a child process and not exec() since Mitogen replaces its stderr
# descriptor, causing the last user of the slave PTY to close it, resulting in
# the master side indicating EIO.
subprocess.check_call(sys.argv[sys.argv.index('--') + 1:])
7 changes: 6 additions & 1 deletion tests/data/stubs/stub-sudo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

import json
import os
import subprocess
import sys

os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv)
os.environ['THIS_IS_STUB_SUDO'] = '1'
os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:])

# This must be a child process and not exec() since Mitogen replaces its stderr
# descriptor, causing the last user of the slave PTY to close it, resulting in
# the master side indicating EIO.
subprocess.check_call(sys.argv[sys.argv.index('--') + 1:])

0 comments on commit 802de6a

Please sign in to comment.