Skip to content

Commit

Permalink
Start threads only after calling read and write on ThreadedGzip
Browse files Browse the repository at this point in the history
This patch reverts daemon threads, and postpones the launch of threads until
calling `read` / `write` on `ThreadedGzipReader` / `ThreadedGzipWriter`. This
is supposed to fix #53.

Signed-off-by: y5c4l3 <[email protected]>
  • Loading branch information
y5c4l3 committed Sep 13, 2024
1 parent 050fb00 commit 1c3f210
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
49 changes: 29 additions & 20 deletions src/zlib_ng/gzip_ng_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,9 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024):
self.exception = None
self.buffer = io.BytesIO()
self.block_size = block_size
# Using a daemon thread prevents programs freezing on error.
self.worker = threading.Thread(target=self._decompress, daemon=True)
self.worker = threading.Thread(target=self._decompress)
self._closed = False
self.running = True
self.worker.start()
self.running = False

def _check_closed(self, msg=None):
if self._closed:
Expand All @@ -126,8 +124,19 @@ def _decompress(self):
except queue.Full:
pass

def _start(self):
if not self.running:
self.running = True
self.worker.start()

def _stop(self):
if self.running:
self.running = False
self.worker.join()

def readinto(self, b):
self._check_closed()
self._start()
result = self.buffer.readinto(b)
if result == 0:
while True:
Expand Down Expand Up @@ -155,8 +164,7 @@ def tell(self) -> int:
def close(self) -> None:
if self._closed:
return
self.running = False
self.worker.join()
self._stop()
self.fileobj.close()
if self.closefd:
self.raw.close()
Expand Down Expand Up @@ -232,18 +240,17 @@ def __init__(self,
queue.Queue(queue_size) for _ in range(threads)]
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
queue.Queue(queue_size) for _ in range(threads)]
# Using daemon threads prevents a program freezing on error.
self.output_worker = threading.Thread(target=self._write, daemon=True)
self.output_worker = threading.Thread(target=self._write)
self.compression_workers = [
threading.Thread(target=self._compress, args=(i,), daemon=True)
threading.Thread(target=self._compress, args=(i,))
for i in range(threads)
]
elif threads == 1:
self.input_queues = [queue.Queue(queue_size)]
self.output_queues = []
self.compression_workers = []
self.output_worker = threading.Thread(
target=self._compress_and_write, daemon=True)
target=self._compress_and_write)
else:
raise ValueError(f"threads should be at least 1, got {threads}")
self.threads = threads
Expand All @@ -254,7 +261,6 @@ def __init__(self,
self.raw, self.closefd = open_as_binary_stream(filename, mode)
self._closed = False
self._write_gzip_header()
self.start()

def _check_closed(self, msg=None):
if self._closed:
Expand All @@ -277,21 +283,24 @@ def _write_gzip_header(self):
self.raw.write(struct.pack(
"BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl))

def start(self):
self.running = True
self.output_worker.start()
for worker in self.compression_workers:
worker.start()
def _start(self):
if not self.running:
self.running = True
self.output_worker.start()
for worker in self.compression_workers:
worker.start()

def stop(self):
"""Stop, but do not care for remaining work"""
self.running = False
for worker in self.compression_workers:
worker.join()
self.output_worker.join()
if self.running:
self.running = False
for worker in self.compression_workers:
worker.join()
self.output_worker.join()

def write(self, b) -> int:
self._check_closed()
self._start()
with self.lock:
if self.exception:
raise self.exception
Expand Down
1 change: 1 addition & 0 deletions tests/test_gzip_ng_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def test_threaded_write_error(threads):
threads=threads, block_size=8 * 1024)
# Bypass the write method which should not allow blocks larger than
# block_size.
f._start()
f.input_queues[0].put((os.urandom(1024 * 64), b""))
with pytest.raises(OverflowError) as error:
f.close()
Expand Down

0 comments on commit 1c3f210

Please sign in to comment.