Skip to content

Commit

Permalink
Merge pull request #161 from pycompression/large-piped
Browse files Browse the repository at this point in the history
Fix reading from a large compressed file using external process
  • Loading branch information
rhpvorderman authored Jun 12, 2024
2 parents 3e0c4b0 + 74642c5 commit 538cc1b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
12 changes: 9 additions & 3 deletions src/xopen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def _open_process(self):
# data continuously to the process stdin on another thread.
self.in_thread = threading.Thread(target=self._feed_pipe)
self.in_thread.start()
self._process_explicitly_terminated = False
self._file: BinaryIO = self.process.stdout # type: ignore
self._wait_for_output_or_process_exit()
self._raise_if_error()
Expand All @@ -290,7 +291,11 @@ def _feed_pipe(self):
if chunk == b"":
self.in_pipe.close()
return
self.in_pipe.write(chunk)
try:
self.in_pipe.write(chunk)
except BrokenPipeError:
if not self._process_explicitly_terminated:
raise
finally:
self.in_pipe.close()

Expand Down Expand Up @@ -329,14 +334,15 @@ def close(self) -> None:
return
check_allowed_code_and_message = False
if "r" in self._mode:
self._feeding = False
self._file.read()
retcode = self.process.poll()
if retcode is None:
# still running
self._process_explicitly_terminated = True
self.process.terminate()
check_allowed_code_and_message = True
self.process.wait()
self._feeding = False
self._file.read()
if self.in_thread:
self.in_thread.join()
self._file.close()
Expand Down
10 changes: 6 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
def create_large_file(tmp_path):
def _create_large_file(extension):
path = tmp_path / f"large{extension}"
random_text = "".join(random.choices(string.ascii_lowercase, k=1024))
# Make the text a lot bigger in order to ensure that it is larger than the
# pipe buffer size.
random_text *= 2048
random.seed(0)
chars = string.ascii_lowercase + "\n"
# Do not decrease this length. The generated file needs to have
# a certain length after compression to trigger some bugs
# (in particular, 512 kB is not sufficient).
random_text = "".join(random.choices(chars, k=1024 * 1024))
with xopen(path, "w") as f:
f.write(random_text)
return path
Expand Down
3 changes: 0 additions & 3 deletions tests/test_piped.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os
import shutil
import sys
import time
import pytest
from pathlib import Path
from itertools import cycle
Expand Down Expand Up @@ -189,8 +188,6 @@ def test_reader_close(reader, create_large_file):
large_file, "rb", program_settings=program_settings
) as f:
f.readline()
time.sleep(0.2)
# The subprocess should be properly terminated now


def test_invalid_gzip_compression_level(gzip_writer, tmp_path):
Expand Down

0 comments on commit 538cc1b

Please sign in to comment.