Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove QUEUE_TIMEOUT_GET, use a Stop sentinel instead #551

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion awscli/customizations/s3/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
MULTI_THRESHOLD = 8 * (1024 ** 2)
CHUNKSIZE = 7 * (1024 ** 2)
NUM_THREADS = 10
QUEUE_TIMEOUT_GET = 1.0
QUEUE_TIMEOUT_WAIT = 0.2
MAX_PARTS = 950
MAX_SINGLE_UPLOAD_SIZE = 5 * (1024 ** 3)
Expand Down
32 changes: 19 additions & 13 deletions awscli/customizations/s3/executer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@

LOGGER = logging.getLogger(__name__)

QueueEndSentinel = object()

class Executer(object):
"""
This class is in charge of all of the threads. It starts up the threads
and cleans up the threads when done. The two type of threads the
``Executer``runs is a worker and a print thread.
"""
def __init__(self, done, num_threads, timeout,
result_queue, quiet, interrupt, max_queue_size):
def __init__(self, done, num_threads, result_queue,
quiet, interrupt, max_queue_size):
self.queue = None
self.done = done
self.num_threads = num_threads
self.timeout = timeout
self.result_queue = result_queue
self.quiet = quiet
self.interrupt = interrupt
Expand All @@ -49,15 +49,13 @@ def num_tasks_failed(self):

def start(self):
self.print_thread = PrintThread(self.result_queue, self.done,
self.quiet, self.interrupt,
self.timeout)
self.quiet, self.interrupt)
self.print_thread.daemon = True
self.queue = NoBlockQueue(self.interrupt, maxsize=self._max_queue_size)
self.threads_list.append(self.print_thread)
self.print_thread.start()
for i in range(self.num_threads):
worker = Worker(queue=self.queue, done=self.done,
timeout=self.timeout)
worker = Worker(queue=self.queue, done=self.done)
worker.setDaemon(True)
self.threads_list.append(worker)
worker.start()
Expand All @@ -80,6 +78,10 @@ def join(self):
"""
This is used to clean up the ``Executer``.
"""
self.result_queue.put(QueueEndSentinel)
for i in range(self.num_threads):
self.queue.put(QueueEndSentinel)

for thread in self.threads_list:
thread.join()

Expand All @@ -89,17 +91,19 @@ class Worker(threading.Thread):
This thread is in charge of performing the tasks provided via
the main queue ``queue``.
"""
def __init__(self, queue, done, timeout):
def __init__(self, queue, done):
threading.Thread.__init__(self)
# This is the queue where work (tasks) are submitted.
self.queue = queue
self.done = done
self.timeout = timeout

def run(self):
while True:
try:
function = self.queue.get(True, self.timeout)
function = self.queue.get(True)
if function is QueueEndSentinel:
self.queue.task_done()
break
try:
function()
except Exception as e:
Expand Down Expand Up @@ -132,14 +136,13 @@ class PrintThread(threading.Thread):
deprecated, will be removed in the future).

"""
def __init__(self, result_queue, done, quiet, interrupt, timeout):
def __init__(self, result_queue, done, quiet, interrupt):
threading.Thread.__init__(self)
self._progress_dict = {}
self._interrupt = interrupt
self._result_queue = result_queue
self._done = done
self._quiet = quiet
self._timeout = timeout
self._progress_length = 0
self._num_parts = 0
self._file_count = 0
Expand All @@ -164,7 +167,10 @@ def set_total_files(self, total_files):
def run(self):
while True:
try:
print_task = self._result_queue.get(True, self._timeout)
print_task = self._result_queue.get(True)
if print_task is QueueEndSentinel:
self._result_queue.task_done()
break
LOGGER.debug("Received print task: %s", print_task)
try:
self._process_print_task(print_task)
Expand Down
6 changes: 2 additions & 4 deletions awscli/customizations/s3/s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
import threading

from awscli.customizations.s3.constants import MULTI_THRESHOLD, CHUNKSIZE, \
NUM_THREADS, QUEUE_TIMEOUT_GET, MAX_UPLOAD_SIZE, \
MAX_QUEUE_SIZE
NUM_THREADS, MAX_UPLOAD_SIZE, MAX_QUEUE_SIZE
from awscli.customizations.s3.utils import NoBlockQueue, find_chunksize, \
operate, find_bucket_key, relative_path
from awscli.customizations.s3.executer import Executer
Expand Down Expand Up @@ -52,8 +51,7 @@ def __init__(self, session, params, multi_threshold=MULTI_THRESHOLD,
self.multi_threshold = multi_threshold
self.chunksize = chunksize
self.executer = Executer(
done=self.done, num_threads=NUM_THREADS,
timeout=QUEUE_TIMEOUT_GET, result_queue=self.result_queue,
done=self.done, num_threads=NUM_THREADS, result_queue=self.result_queue,
quiet=self.params['quiet'], interrupt=self.interrupt,
max_queue_size=MAX_QUEUE_SIZE,
)
Expand Down
4 changes: 0 additions & 4 deletions tests/unit/customizations/s3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,11 @@ class S3HandlerBaseTest(unittest.TestCase):
shorter amount of time.
"""
def setUp(self):
get = 'awscli.customizations.s3.constants.QUEUE_TIMEOUT_GET'
wait = 'awscli.customizations.s3.constants.QUEUE_TIMEOUT_WAIT'
self.get_timeout_patch = patch(get, 0.01)
self.wait_timeout_patch = patch(wait, 0.01)
self.mock_get = self.get_timeout_patch.start()
self.mock_wait = self.wait_timeout_patch.start()

def tearDown(self):
self.get_timeout_patch.stop()
self.wait_timeout_patch.stop()


Expand Down