Skip to content

Commit

Permalink
propagate context
Browse files Browse the repository at this point in the history
  • Loading branch information
SuyogSoti committed Jul 25, 2019
1 parent 5b058c7 commit 1bd07ed
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from azure.core.exceptions import HttpResponseError

from azure.core.tracing.context import tracing_context
from .request_handlers import validate_and_format_range_headers
from .response_handlers import process_storage_error, parse_length_from_content_range
from .encryption import decrypt_blob
Expand Down Expand Up @@ -454,7 +455,7 @@ def download_to_stream(self, stream, max_connections=1):
if max_connections > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
list(executor.map(tracing_context.with_current_context(downloader.process_chunk), downloader.get_chunk_offsets()))
else:
for chunk in downloader.get_chunk_offsets():
downloader.process_chunk(chunk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import six

from azure.core.tracing.context import tracing_context
from . import encode_base64, url_quote
from .request_handlers import get_length
from .response_handlers import return_response_headers
Expand All @@ -34,7 +35,7 @@ def _parallel_uploads(executor, uploader, pending, running):
except StopIteration:
break
else:
running.add(executor.submit(uploader.process_chunk, next_chunk))
running.add(executor.submit(tracing_context.with_current_context(uploader.process_chunk), next_chunk))

# Wait for the remaining uploads to finish
done, _running = futures.wait(running)
Expand Down Expand Up @@ -79,7 +80,7 @@ def upload_data_chunks(
executor = futures.ThreadPoolExecutor(max_connections)
upload_tasks = uploader.get_chunk_streams()
running_futures = [
executor.submit(uploader.process_chunk, u)
executor.submit(tracing_context.with_current_context(uploader.process_chunk), u)
for u in islice(upload_tasks, 0, max_connections)
]
range_ids = _parallel_uploads(executor, uploader, upload_tasks, running_futures)
Expand Down Expand Up @@ -115,7 +116,7 @@ def upload_substream_blocks(
executor = futures.ThreadPoolExecutor(max_connections)
upload_tasks = uploader.get_substream_blocks()
running_futures = [
executor.submit(uploader.process_substream_block, u)
executor.submit(tracing_context.with_current_context(uploader.process_substream_block), u)
for u in islice(upload_tasks, 0, max_connections)
]
return _parallel_uploads(executor, uploader, upload_tasks, running_futures)
Expand Down

0 comments on commit 1bd07ed

Please sign in to comment.