From 7b3f154dc6b08172931a28915c1d578c681a375c Mon Sep 17 00:00:00 2001 From: Benjamin Chess Date: Mon, 9 Dec 2013 17:25:39 -0800 Subject: [PATCH] remove QUEUE_TIMEOUT_GET, use a Stop sentinel instead --- awscli/customizations/s3/constants.py | 1 - awscli/customizations/s3/executer.py | 32 ++++++++++++++---------- awscli/customizations/s3/s3handler.py | 6 ++--- tests/unit/customizations/s3/__init__.py | 4 --- 4 files changed, 21 insertions(+), 22 deletions(-) diff --git a/awscli/customizations/s3/constants.py b/awscli/customizations/s3/constants.py index ff100bc6d7a3..d0877eed26b2 100644 --- a/awscli/customizations/s3/constants.py +++ b/awscli/customizations/s3/constants.py @@ -13,7 +13,6 @@ MULTI_THRESHOLD = 8 * (1024 ** 2) CHUNKSIZE = 7 * (1024 ** 2) NUM_THREADS = 10 -QUEUE_TIMEOUT_GET = 1.0 QUEUE_TIMEOUT_WAIT = 0.2 MAX_PARTS = 950 MAX_SINGLE_UPLOAD_SIZE = 5 * (1024 ** 3) diff --git a/awscli/customizations/s3/executer.py b/awscli/customizations/s3/executer.py index 360b64c033e4..6cfb69cf1188 100644 --- a/awscli/customizations/s3/executer.py +++ b/awscli/customizations/s3/executer.py @@ -20,6 +20,7 @@ LOGGER = logging.getLogger(__name__) +QueueEndSentinel = object() class Executer(object): """ @@ -27,12 +28,11 @@ class Executer(object): and cleans up the threads when done. The two type of threads the ``Executer``runs is a worker and a print thread. """ - def __init__(self, done, num_threads, timeout, - result_queue, quiet, interrupt, max_queue_size): + def __init__(self, done, num_threads, result_queue, + quiet, interrupt, max_queue_size): self.queue = None self.done = done self.num_threads = num_threads - self.timeout = timeout self.result_queue = result_queue self.quiet = quiet self.interrupt = interrupt @@ -49,15 +49,13 @@ def num_tasks_failed(self): def start(self): self.print_thread = PrintThread(self.result_queue, self.done, - self.quiet, self.interrupt, - self.timeout) + self.quiet, self.interrupt) self.print_thread.daemon = True self.queue = NoBlockQueue(self.interrupt, maxsize=self._max_queue_size) self.threads_list.append(self.print_thread) self.print_thread.start() for i in range(self.num_threads): - worker = Worker(queue=self.queue, done=self.done, - timeout=self.timeout) + worker = Worker(queue=self.queue, done=self.done) worker.setDaemon(True) self.threads_list.append(worker) worker.start() @@ -80,6 +78,10 @@ def join(self): """ This is used to clean up the ``Executer``. """ + self.result_queue.put(QueueEndSentinel) + for i in range(self.num_threads): + self.queue.put(QueueEndSentinel) + for thread in self.threads_list: thread.join() @@ -89,17 +91,19 @@ class Worker(threading.Thread): This thread is in charge of performing the tasks provided via the main queue ``queue``. """ - def __init__(self, queue, done, timeout): + def __init__(self, queue, done): threading.Thread.__init__(self) # This is the queue where work (tasks) are submitted. self.queue = queue self.done = done - self.timeout = timeout def run(self): while True: try: - function = self.queue.get(True, self.timeout) + function = self.queue.get(True) + if function is QueueEndSentinel: + self.queue.task_done() + break try: function() except Exception as e: @@ -132,14 +136,13 @@ class PrintThread(threading.Thread): deprecated, will be removed in the future). """ - def __init__(self, result_queue, done, quiet, interrupt, timeout): + def __init__(self, result_queue, done, quiet, interrupt): threading.Thread.__init__(self) self._progress_dict = {} self._interrupt = interrupt self._result_queue = result_queue self._done = done self._quiet = quiet - self._timeout = timeout self._progress_length = 0 self._num_parts = 0 self._file_count = 0 @@ -164,7 +167,10 @@ def set_total_files(self, total_files): def run(self): while True: try: - print_task = self._result_queue.get(True, self._timeout) + print_task = self._result_queue.get(True) + if print_task is QueueEndSentinel: + self._result_queue.task_done() + break LOGGER.debug("Received print task: %s", print_task) try: self._process_print_task(print_task) diff --git a/awscli/customizations/s3/s3handler.py b/awscli/customizations/s3/s3handler.py index 498f00d4761b..d01b9989e7ad 100644 --- a/awscli/customizations/s3/s3handler.py +++ b/awscli/customizations/s3/s3handler.py @@ -16,8 +16,7 @@ import threading from awscli.customizations.s3.constants import MULTI_THRESHOLD, CHUNKSIZE, \ - NUM_THREADS, QUEUE_TIMEOUT_GET, MAX_UPLOAD_SIZE, \ - MAX_QUEUE_SIZE + NUM_THREADS, MAX_UPLOAD_SIZE, MAX_QUEUE_SIZE from awscli.customizations.s3.utils import NoBlockQueue, find_chunksize, \ operate, find_bucket_key, relative_path from awscli.customizations.s3.executer import Executer @@ -52,8 +51,7 @@ def __init__(self, session, params, multi_threshold=MULTI_THRESHOLD, self.multi_threshold = multi_threshold self.chunksize = chunksize self.executer = Executer( - done=self.done, num_threads=NUM_THREADS, - timeout=QUEUE_TIMEOUT_GET, result_queue=self.result_queue, + done=self.done, num_threads=NUM_THREADS, result_queue=self.result_queue, quiet=self.params['quiet'], interrupt=self.interrupt, max_queue_size=MAX_QUEUE_SIZE, ) diff --git a/tests/unit/customizations/s3/__init__.py b/tests/unit/customizations/s3/__init__.py index 63698af40e89..90b2b4c7465a 100644 --- a/tests/unit/customizations/s3/__init__.py +++ b/tests/unit/customizations/s3/__init__.py @@ -25,15 +25,11 @@ class S3HandlerBaseTest(unittest.TestCase): shorter amount of time. """ def setUp(self): - get = 'awscli.customizations.s3.constants.QUEUE_TIMEOUT_GET' wait = 'awscli.customizations.s3.constants.QUEUE_TIMEOUT_WAIT' - self.get_timeout_patch = patch(get, 0.01) self.wait_timeout_patch = patch(wait, 0.01) - self.mock_get = self.get_timeout_patch.start() self.mock_wait = self.wait_timeout_patch.start() def tearDown(self): - self.get_timeout_patch.stop() self.wait_timeout_patch.stop()