Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move fflush to a thread #16

Merged
merged 3 commits into from
May 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ htmlcov/
.coverage
.coverage.*
.cache
.pytest_cache
nosetests.xml
coverage.xml
*,cover
Expand Down
19 changes: 14 additions & 5 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ def test_pipes():
with pipes(stdout=PIPE, stderr=PIPE) as (stdout, stderr):
printf(u"Hellø")
printf_err(u"Hi, stdérr")

assert stdout.read() == u"Hellø\n"
assert stderr.read() == u"Hi, stdérr\n"

def test_pipe_bytes():
with pipes(encoding=None) as (stdout, stderr):
printf(u"Hellø")
printf_err(u"Hi, stdérr")

assert stdout.read() == u"Hellø\n".encode('utf8')
assert stderr.read() == u"Hi, stdérr\n".encode('utf8')

Expand All @@ -47,7 +47,7 @@ def test_forward():
printf_err(u"Hi, stdérr")
assert _stdout is stdout
assert _stderr is stderr

assert stdout.getvalue() == u"Hellø\n"
assert stderr.getvalue() == u"Hi, stdérr\n"

Expand All @@ -59,15 +59,15 @@ def test_pipes_stderr():
printf_err(u"Hi, stdérr")
assert _stdout is stdout
assert _stderr is None

assert stdout.getvalue() == u"Hellø\nHi, stdérr\n"

def test_flush():
stdout = io.StringIO()
w = Wurlitzer(stdout=stdout, stderr=STDOUT)
with w:
printf_err(u"Hellø")
time.sleep(1)
time.sleep(0.5)
assert stdout.getvalue().strip() == u"Hellø"

def test_sys_pipes():
Expand Down Expand Up @@ -113,3 +113,12 @@ def test_fd_leak():
with pipes():
print('ok')
assert count_fds() == base_count


def test_buffer_full():
with pipes(stdout=None, stderr=io.StringIO()) as (stdout, stderr):
long_string = "x" * 1000000 # create a very long string
printf_err(long_string)

# Test never reaches here as the process hangs.
assert stderr.getvalue() == long_string + "\n"
56 changes: 43 additions & 13 deletions wurlitzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from fcntl import fcntl, F_GETFL, F_SETFL
import io
import os
try:
from queue import Queue
except ImportError:
from Queue import Queue
import select
import sys
import threading
Expand All @@ -44,6 +48,7 @@
# don't respect ascii
_default_encoding = 'utf8' # pragma: no cover


def dup2(a, b, timeout=3):
"""Like os.dup2, but retry on EBUSY"""
dup_err = None
Expand All @@ -59,7 +64,7 @@ def dup2(a, b, timeout=3):
raise
if dup_err:
raise dup_err


class Wurlitzer(object):
"""Class for Capturing Process-level FD output via dup2
Expand Down Expand Up @@ -90,7 +95,7 @@ def __init__(self, stdout=None, stderr=None, encoding=_default_encoding):
self._handlers = {}
self._handlers['stderr'] = self._handle_stderr
self._handlers['stdout'] = self._handle_stdout

def _setup_pipe(self, name):
real_fd = getattr(sys, '__%s__' % name).fileno()
save_fd = os.dup(real_fd)
Expand Down Expand Up @@ -126,19 +131,19 @@ def _handle_stderr(self, data):
def _setup_handle(self):
"""Setup handle for output, if any"""
self.handle = (self._stdout, self._stderr)

def _finish_handle(self):
"""Finish handle, if anything should be done when it's all wrapped up."""
pass

def __enter__(self):
# flush anything out before starting
libc.fflush(c_stdout_p)
libc.fflush(c_stderr_p)
# setup handle
self._setup_handle()
self._control_r, self._control_w = os.pipe()

# create pipe for stdout
pipes = [self._control_r]
names = {self._control_r: 'control'}
Expand All @@ -151,21 +156,43 @@ def __enter__(self):
pipes.append(pipe)
names[pipe] = 'stderr'

# flush pipes in a background thread to avoid blocking
# the reader thread when the buffer is full
flush_queue = Queue()

def flush_main():
while True:
msg = flush_queue.get()
if msg == 'stop':
return
libc.fflush(c_stdout_p)
libc.fflush(c_stderr_p)

flush_thread = threading.Thread(target=flush_main)
flush_thread.daemon = True
flush_thread.start()

def forwarder():
"""Forward bytes on a pipe to stream messages"""
done = False
draining = False
flush_interval = 0
while pipes:
# flush libc's buffers before calling select
libc.fflush(c_stdout_p)
libc.fflush(c_stderr_p)
r, w, x = select.select(pipes, [], [], self.flush_interval)
if not r:
r, w, x = select.select(pipes, [], [], flush_interval)
Copy link

@gidim gidim May 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you changeself.flush_interval to flush_interval?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because now it may be self._flush_interval or 0 depending on the previous state.

if r:
# found something to read, don't block select until
# we run out of things to read
flush_interval = 0
else:
# nothing to read
if draining:
# if we are draining and there's nothing to read, stop
break
else:
# nothing to read, next iteration will flush and check again
# nothing to read, get ready to wait.
# flush the streams in case there's something waiting
# to be written.
flush_queue.put('flush')
flush_interval = self.flush_interval
continue
for pipe in r:
if pipe == self._control_r:
Expand All @@ -185,8 +212,11 @@ def forwarder():
if not pipes:
# pipes closed, we are done
break
# stop flush thread
flush_queue.put('stop')
flush_thread.join()
# cleanup pipes
[ os.close(pipe) for pipe in pipes ]
[os.close(pipe) for pipe in pipes]

self.thread = threading.Thread(target=forwarder)
self.thread.daemon = True
Expand Down