From 85b9433586905d36ab711e4f76601b3b1c9731ff Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 22 Jun 2017 16:15:22 +0200 Subject: [PATCH 1/2] use control pipe to signal closure instead of closing pipes which causes EBADFD sometimes also fixes some leaking pipes on each iteration --- wurlitzer.py | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/wurlitzer.py b/wurlitzer.py index 8b52970..bb74bd0 100644 --- a/wurlitzer.py +++ b/wurlitzer.py @@ -90,7 +90,7 @@ def _setup_pipe(self, name): def _decode(self, data): """Decode data, if any - Called before pasing to stdout/stderr streams + Called before passing to stdout/stderr streams """ if self.encoding: data = data.decode(self.encoding, 'replace') @@ -118,10 +118,11 @@ def __enter__(self): libc.fflush(c_stderr_p) # setup handle self._setup_handle() + self._control_r, self._control_w = os.pipe() # create pipe for stdout - pipes = [] - names = {} + pipes = [self._control_r] + names = {self._control_r: 'control'} if self._stdout: pipe = self._setup_pipe('stdout') pipes.append(pipe) @@ -130,22 +131,34 @@ def __enter__(self): pipe = self._setup_pipe('stderr') pipes.append(pipe) names[pipe] = 'stderr' - + def forwarder(): """Forward bytes on a pipe to stream messages""" - while True: + done = False + draining = False + while pipes: # flush libc's buffers before calling select libc.fflush(c_stdout_p) libc.fflush(c_stderr_p) r, w, x = select.select(pipes, [], [], self.flush_interval) if not r: - # nothing to read, next iteration will flush and check again - continue + if draining: + # if we are draining and there's nothing to read, stop + break + else: + # nothing to read, next iteration will flush and check again + continue for pipe in r: + if pipe == self._control_r: + draining = True + os.close(self._control_r) + pipes.remove(self._control_r) + continue name = names[pipe] data = os.read(pipe, 1024) if not data: - # pipe closed, stop polling + # pipe closed, stop polling it + os.close(pipe) pipes.remove(pipe) else: handler = getattr(self, '_handle_%s' % name) @@ -153,6 +166,9 @@ def forwarder(): if not pipes: # pipes closed, we are done break + # cleanup pipes + [ os.close(pipe) for pipe in pipes ] + self.thread = threading.Thread(target=forwarder) self.thread.daemon = True self.thread.start() @@ -163,10 +179,12 @@ def __exit__(self, exc_type, exc_value, traceback): # flush the underlying C buffers libc.fflush(c_stdout_p) libc.fflush(c_stderr_p) - # close FDs, signaling output is complete + # signal output is complete on control pipe + os.write(self._control_w, b'\1') + self.thread.join() + os.close(self._control_w) for real_fd in self._real_fds.values(): os.close(real_fd) - self.thread.join() # restore original state for name, real_fd in self._real_fds.items(): From 6e2e906a8f2ffbd5b27d92a147cd59b0d5008041 Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 22 Jun 2017 16:28:13 +0200 Subject: [PATCH 2/2] retry dup2 on EBUSY --- wurlitzer.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/wurlitzer.py b/wurlitzer.py index bb74bd0..e9c2fec 100644 --- a/wurlitzer.py +++ b/wurlitzer.py @@ -16,12 +16,14 @@ from contextlib import contextmanager import ctypes +import errno from fcntl import fcntl, F_GETFL, F_SETFL import io import os import select import sys import threading +import time import warnings libc = ctypes.CDLL(None) @@ -42,6 +44,23 @@ # don't respect ascii _default_encoding = 'utf8' # pragma: no cover +def dup2(a, b, timeout=3): + """Like os.dup2, but retry on EBUSY""" + dup_err = None + # give FDs 3 seconds to not be busy anymore + for i in range(int(10 * timeout)): + try: + return os.dup2(a, b) + except OSError as e: + dup_err = e + if e.errno == errno.EBUSY: + time.sleep(0.1) + else: + raise + if dup_err: + raise dup_err + + class Wurlitzer(object): """Class for Capturing Process-level FD output via dup2 @@ -78,7 +97,7 @@ def _setup_pipe(self, name): self._save_fds[name] = save_fd pipe_out, pipe_in = os.pipe() - os.dup2(pipe_in, real_fd) + dup2(pipe_in, real_fd) os.close(pipe_in) self._real_fds[name] = real_fd @@ -189,7 +208,7 @@ def __exit__(self, exc_type, exc_value, traceback): # restore original state for name, real_fd in self._real_fds.items(): save_fd = self._save_fds[name] - os.dup2(save_fd, real_fd) + dup2(save_fd, real_fd) os.close(save_fd) # finalize handle self._finish_handle()