Skip to content

Commit

Permalink
Merge pull request #8 from minrk/badfd
Browse files Browse the repository at this point in the history
use control pipe to signal closure
  • Loading branch information
minrk authored Jun 22, 2017
2 parents e438edd + 6e2e906 commit d3dfa51
Showing 1 changed file with 49 additions and 12 deletions.
61 changes: 49 additions & 12 deletions wurlitzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

from contextlib import contextmanager
import ctypes
import errno
from fcntl import fcntl, F_GETFL, F_SETFL
import io
import os
import select
import sys
import threading
import time
import warnings

libc = ctypes.CDLL(None)
Expand All @@ -42,6 +44,23 @@
# don't respect ascii
_default_encoding = 'utf8' # pragma: no cover

def dup2(a, b, timeout=3):
"""Like os.dup2, but retry on EBUSY"""
dup_err = None
# give FDs 3 seconds to not be busy anymore
for i in range(int(10 * timeout)):
try:
return os.dup2(a, b)
except OSError as e:
dup_err = e
if e.errno == errno.EBUSY:
time.sleep(0.1)
else:
raise
if dup_err:
raise dup_err


class Wurlitzer(object):
"""Class for Capturing Process-level FD output via dup2
Expand Down Expand Up @@ -78,7 +97,7 @@ def _setup_pipe(self, name):
self._save_fds[name] = save_fd

pipe_out, pipe_in = os.pipe()
os.dup2(pipe_in, real_fd)
dup2(pipe_in, real_fd)
os.close(pipe_in)
self._real_fds[name] = real_fd

Expand All @@ -90,7 +109,7 @@ def _setup_pipe(self, name):
def _decode(self, data):
"""Decode data, if any
Called before pasing to stdout/stderr streams
Called before passing to stdout/stderr streams
"""
if self.encoding:
data = data.decode(self.encoding, 'replace')
Expand Down Expand Up @@ -118,10 +137,11 @@ def __enter__(self):
libc.fflush(c_stderr_p)
# setup handle
self._setup_handle()
self._control_r, self._control_w = os.pipe()

# create pipe for stdout
pipes = []
names = {}
pipes = [self._control_r]
names = {self._control_r: 'control'}
if self._stdout:
pipe = self._setup_pipe('stdout')
pipes.append(pipe)
Expand All @@ -130,29 +150,44 @@ def __enter__(self):
pipe = self._setup_pipe('stderr')
pipes.append(pipe)
names[pipe] = 'stderr'

def forwarder():
"""Forward bytes on a pipe to stream messages"""
while True:
done = False
draining = False
while pipes:
# flush libc's buffers before calling select
libc.fflush(c_stdout_p)
libc.fflush(c_stderr_p)
r, w, x = select.select(pipes, [], [], self.flush_interval)
if not r:
# nothing to read, next iteration will flush and check again
continue
if draining:
# if we are draining and there's nothing to read, stop
break
else:
# nothing to read, next iteration will flush and check again
continue
for pipe in r:
if pipe == self._control_r:
draining = True
os.close(self._control_r)
pipes.remove(self._control_r)
continue
name = names[pipe]
data = os.read(pipe, 1024)
if not data:
# pipe closed, stop polling
# pipe closed, stop polling it
os.close(pipe)
pipes.remove(pipe)
else:
handler = getattr(self, '_handle_%s' % name)
handler(data)
if not pipes:
# pipes closed, we are done
break
# cleanup pipes
[ os.close(pipe) for pipe in pipes ]

self.thread = threading.Thread(target=forwarder)
self.thread.daemon = True
self.thread.start()
Expand All @@ -163,15 +198,17 @@ def __exit__(self, exc_type, exc_value, traceback):
# flush the underlying C buffers
libc.fflush(c_stdout_p)
libc.fflush(c_stderr_p)
# close FDs, signaling output is complete
# signal output is complete on control pipe
os.write(self._control_w, b'\1')
self.thread.join()
os.close(self._control_w)
for real_fd in self._real_fds.values():
os.close(real_fd)
self.thread.join()

# restore original state
for name, real_fd in self._real_fds.items():
save_fd = self._save_fds[name]
os.dup2(save_fd, real_fd)
dup2(save_fd, real_fd)
os.close(save_fd)
# finalize handle
self._finish_handle()
Expand Down

0 comments on commit d3dfa51

Please sign in to comment.