diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index 9cc7c69..89f5137 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -267,6 +267,7 @@ def _open_process(self): # data continuously to the process stdin on another thread. self.in_thread = threading.Thread(target=self._feed_pipe) self.in_thread.start() + self._process_explicitly_terminated = False self._file: BinaryIO = self.process.stdout # type: ignore self._wait_for_output_or_process_exit() self._raise_if_error() @@ -290,7 +291,11 @@ def _feed_pipe(self): if chunk == b"": self.in_pipe.close() return - self.in_pipe.write(chunk) + try: + self.in_pipe.write(chunk) + except BrokenPipeError: + if not self._process_explicitly_terminated: + raise finally: self.in_pipe.close() @@ -329,14 +334,15 @@ def close(self) -> None: return check_allowed_code_and_message = False if "r" in self._mode: - self._feeding = False - self._file.read() retcode = self.process.poll() if retcode is None: # still running + self._process_explicitly_terminated = True self.process.terminate() check_allowed_code_and_message = True self.process.wait() + self._feeding = False + self._file.read() if self.in_thread: self.in_thread.join() self._file.close() diff --git a/tests/conftest.py b/tests/conftest.py index 33ad423..2a63905 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,10 +10,12 @@ def create_large_file(tmp_path): def _create_large_file(extension): path = tmp_path / f"large{extension}" - random_text = "".join(random.choices(string.ascii_lowercase, k=1024)) - # Make the text a lot bigger in order to ensure that it is larger than the - # pipe buffer size. - random_text *= 2048 + random.seed(0) + chars = string.ascii_lowercase + "\n" + # Do not decrease this length. The generated file needs to have + # a certain length after compression to trigger some bugs + # (in particular, 512 kB is not sufficient). + random_text = "".join(random.choices(chars, k=1024 * 1024)) with xopen(path, "w") as f: f.write(random_text) return path diff --git a/tests/test_piped.py b/tests/test_piped.py index ebd5aa3..9f8afbe 100644 --- a/tests/test_piped.py +++ b/tests/test_piped.py @@ -6,7 +6,6 @@ import os import shutil import sys -import time import pytest from pathlib import Path from itertools import cycle @@ -189,8 +188,6 @@ def test_reader_close(reader, create_large_file): large_file, "rb", program_settings=program_settings ) as f: f.readline() - time.sleep(0.2) - # The subprocess should be properly terminated now def test_invalid_gzip_compression_level(gzip_writer, tmp_path):