-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(BrokenPipeError
): fixes #1591
#3233
Conversation
To test the function we need to be able to simulate Here are some utility functions for creating named pipes import errno
import os
import sys
import threading
import time
def create_fifo(fifo_name):
try:
os.mkfifo(fifo_name)
except FileExistsError:
pass # FIFO already exists
def remove_fifo(fifo_name):
try:
os.remove(fifo_name)
except OSError:
pass # Handle error if needed
def read_from_fifo(fifo_name, num_lines):
with open(fifo_name, "r") as fifo:
for _ in range(num_lines):
if fifo.readline() == "":
break # Exit if pipe is closed or EOF
def write_to_fifo_unsafely(fifo_name, num_lines):
with open(fifo_name, "w") as fifo:
for i in range(num_lines):
print(f"Line {i}", file=fifo, flush=True)
time.sleep(0.1)
def write_to_fifo_safely(fifo_name, num_lines):
message = ""
try:
write_to_fifo_unsafely(fifo_name, num_lines)
except BrokenPipeError:
message = "Broken pipe encountered with BrokenPipeError."
except OSError as e:
if e.errno != errno.EPIPE:
raise
message = "Broken pipe encountered with OSError."
return message and here is a pytest test that confirms this approach raises a import tempfile
from threading import Thread
import pytest
from pytest_pipe_utils import (
create_fifo,
read_from_fifo,
remove_fifo,
write_to_fifo_safely,
write_to_fifo_unsafely,
)
@pytest.fixture
def fifo_name():
with tempfile.NamedTemporaryFile() as tmp:
fifo_path = tmp.name
create_fifo(fifo_path)
yield fifo_path
remove_fifo(fifo_path)
def test_write_to_fifo_safely(fifo_name):
reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
reader_thread.start()
message = write_to_fifo_safely(fifo_name, 10)
reader_thread.join()
assert "Broken pipe encountered" in message
def test_write_to_fifo_unsafely(fifo_name):
with pytest.raises(BrokenPipeError):
reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
reader_thread.start()
message = write_to_fifo_unsafely(fifo_name, 10)
reader_thread.join() Results:
|
To edit these tests for rich, we can just pass in a console and use its print statement instead.
import errno
import os
import sys
import threading
import time
def create_fifo(fifo_name):
try:
os.mkfifo(fifo_name)
except FileExistsError:
pass # FIFO already exists
def remove_fifo(fifo_name):
try:
os.remove(fifo_name)
except OSError:
pass # Handle error if needed
def read_from_fifo(fifo_name, num_lines):
with open(fifo_name, "r") as fifo:
for _ in range(num_lines):
if fifo.readline() == "":
break # Exit if pipe is closed or EOF
def write_to_fifo_unsafely(console, fifo_name, num_lines):
with open(fifo_name, "w") as fifo:
console.file = fifo
for i in range(num_lines):
console.print(f"Line {i}")
time.sleep(0.1)
def write_to_fifo_safely(console, fifo_name, num_lines):
message = ""
try:
write_to_fifo_unsafely(console, fifo_name, num_lines)
except BrokenPipeError:
message = "Broken pipe encountered with BrokenPipeError."
except OSError as e:
if e.errno != errno.EPIPE:
raise
message = "Broken pipe encountered with OSError."
return message
import tempfile
from threading import Thread
import pytest
from rich.console import Console
from pytest_rich_pipe_utils import (
create_fifo,
read_from_fifo,
remove_fifo,
write_to_fifo_safely,
write_to_fifo_unsafely,
)
@pytest.fixture
def fifo_name():
with tempfile.NamedTemporaryFile() as tmp:
fifo_path = tmp.name
create_fifo(fifo_path)
yield fifo_path
remove_fifo(fifo_path)
@pytest.fixture
def console():
"""Create a Console instance"""
return Console()
def test_write_to_fifo_safely(console, fifo_name):
reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
reader_thread.start()
message = write_to_fifo_safely(console, fifo_name, 10)
reader_thread.join()
assert "Broken pipe encountered" in message
def test_write_to_fifo_unsafely(console, fifo_name):
with pytest.raises(BrokenPipeError):
reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
reader_thread.start()
message = write_to_fifo_unsafely(console, fifo_name, 10)
reader_thread.join() Result:
|
Our test for this doesn't need to include the "safe" part to do with checking the message, pytest can handle the error raised (or not), so I'll just simplify it in the test I contribute now. The test should therefore be just the example above but without the Also, I won't use the fixture because it's not used in other tests. |
Well, 2 hours later and I don't think it's possible to reproduce a It always gets captured and captured as a |
Here is a test for regular print, not
import socket
import threading
import time
from queue import Queue
import pytest
class Console:
def __init__(self, file):
self.file = file
def print(self, msg):
print(msg, file=self.file, flush=True)
def create_socket_pair():
# Create a pair of connected sockets in blocking mode
return socket.socketpair()
def socket_writer(sock, num_messages, exception_queue):
file_like_socket = sock.makefile("w")
console = Console(file=file_like_socket)
try:
for i in range(num_messages):
message = f"Message {i}"
console.print(message) # Using console.print to write formatted message
time.sleep(0.1) # simulate time delay between messages
except BrokenPipeError as exc:
exception_queue.put(exc)
print("Broken pipe detected in writer.")
finally:
try:
file_like_socket.close()
except BrokenPipeError as exc:
exception_queue.put(exc)
print("Broken pipe detected in writer shutdown.")
finally:
sock.close()
def socket_reader(sock, num_messages_to_read, exception_queue):
try:
for _ in range(num_messages_to_read):
data = sock.recv(1024)
if not data:
break
except BrokenPipeError as exc:
exception_queue.put(exc)
print("Broken pipe detected in reader.")
finally:
sock.close()
def test_console_socket_pair():
# Creating socket pair
sock_writer, sock_reader = create_socket_pair()
excs = Queue()
# Starting threads
writer_thread = threading.Thread(target=socket_writer, args=(sock_writer, 20, excs))
# Read only 10 messages
reader_thread = threading.Thread(target=socket_reader, args=(sock_reader, 10, excs))
with pytest.raises(BrokenPipeError):
reader_thread.start()
writer_thread.start()
reader_thread.join()
writer_thread.join()
if not excs.empty():
raise excs.get() |
Request for review @willmcgugan Turned out it was basically impossible to capture this I decided to keep the status code as 1 ( Doing it this way means the test will have to check its platform but I see you already do that for Windows. I added an early return to make it clear that the extra block of checks is purely there so as to "bail out" if the platform isn't right. |
I think this is a good step forward, but I'm uncertain if this is the fix (or at least all of it). The docs suggest that this can happen on writes as well as flushes. It also seems disruptive for a library to explicitly exit the app. Maybe there should be a method on Console that is called when a broken pip is received. The default behavior could raise an exception, but the dev could implement different behavior if desired. Either way, it should be documented. |
At least, I have a working snippet of code that reliably recreates the problem in #3400. |
It would be nice to have more input on the desired behavior; I mostly agree with Will. As for a import os
import pytest
from rich.console import Console
def test_brokenpipeerror():
r_pipe, w_pipe = os.pipe()
console = Console(file=open(w_pipe, "w"))
os.close(r_pipe)
with pytest.raises(BrokenPipeError):
console.print("This line causes a BrokenPipeError.")
if __name__ == "__main__":
test_brokenpipeerror() |
Closed in favor of #3468 |
Type of changes
Checklist
Description
Fixes the following bug where piping to unix utils that truncate the output, e.g.
head
, raises an uncaughtBrokenPipeError
:Please describe your changes here. If this fixes a bug, please link to the issue, if possible.
Verifying the fix:
Then invoke it:
python broken_pipes_test.py | head -2
is OK buthead -1
throws the error.Current result
PR result
Status
Incomplete: