Skip to content

Commit

Permalink
Trace storage queue (#6449)
Browse files Browse the repository at this point in the history
* decorators and policy added

* properly clear context

* dont decorate private stuff

* added policy

* decorated async

* get rid of those that dont make netowrk calls

* propagate context
  • Loading branch information
SuyogSoti authored Jul 25, 2019
1 parent 6296b96 commit e08bfc6
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024
_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = '{0} should be a seekable file-like/io.IOBase type stream object.'
_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = "{0} should be a seekable file-like/io.IOBase type stream object."


def _parallel_uploads(executor, uploader, pending, running):
Expand Down Expand Up @@ -152,7 +152,7 @@ def __init__(self, service, total_size, chunk_size, stream, parallel, encryptor=
def get_chunk_streams(self):
index = 0
while True:
data = b''
data = b""
read_size = self.chunk_size

# Buffer until we either reach the end of the stream or get a whole chunk.
Expand All @@ -161,12 +161,12 @@ def get_chunk_streams(self):
read_size = min(self.chunk_size - len(data), self.total_size - (index + len(data)))
temp = self.stream.read(read_size)
if not isinstance(temp, six.binary_type):
raise TypeError('Blob data should be of type bytes.')
raise TypeError("Blob data should be of type bytes.")
data += temp or b""

# We have read an empty string and so are at the end
# of the buffer or we have read a full chunk.
if temp == b'' or len(data) == self.chunk_size:
if temp == b"" or len(data) == self.chunk_size:
break

if len(data) == self.chunk_size:
Expand Down Expand Up @@ -249,7 +249,8 @@ def _upload_chunk(self, chunk_offset, chunk_data):
chunk_data,
data_stream_total=self.total_size,
upload_stream_current=self.progress_total,
**self.request_options)
**self.request_options
)
return block_id

def _upload_substream_block(self, block_id, block_stream):
Expand All @@ -260,7 +261,8 @@ def _upload_substream_block(self, block_id, block_stream):
block_stream,
data_stream_total=self.total_size,
upload_stream_current=self.progress_total,
**self.request_options)
**self.request_options
)
finally:
block_stream.close()
return block_id
Expand All @@ -272,15 +274,15 @@ def _is_chunk_empty(self, chunk_data):
# read until non-zero byte is encountered
# if reached the end without returning, then chunk_data is all 0's
for each_byte in chunk_data:
if each_byte not in [0, b'\x00']:
if each_byte not in [0, b"\x00"]:
return False
return True

def _upload_chunk(self, chunk_offset, chunk_data):
# avoid uploading the empty pages
if not self._is_chunk_empty(chunk_data):
chunk_end = chunk_offset + len(chunk_data) - 1
content_range = 'bytes={0}-{1}'.format(chunk_offset, chunk_end)
content_range = "bytes={0}-{1}".format(chunk_offset, chunk_end)
computed_md5 = None
self.response_headers = self.service.upload_pages(
chunk_data,
Expand All @@ -290,7 +292,8 @@ def _upload_chunk(self, chunk_offset, chunk_data):
cls=return_response_headers,
data_stream_total=self.total_size,
upload_stream_current=self.progress_total,
**self.request_options)
**self.request_options
)

if not self.parallel and self.request_options.get('modified_access_conditions'):
self.request_options['modified_access_conditions'].if_match = self.response_headers['etag']
Expand All @@ -312,7 +315,7 @@ def _upload_chunk(self, chunk_offset, chunk_data):
upload_stream_current=self.progress_total,
**self.request_options
)
self.current_length = int(self.response_headers['blob_append_offset'])
self.current_length = int(self.response_headers["blob_append_offset"])
else:
self.request_options['append_position_access_conditions'].append_position = \
self.current_length + chunk_offset
Expand Down Expand Up @@ -362,8 +365,9 @@ def __init__(self, wrapped_stream, stream_begin_index, length, lockObj):

# we must avoid buffering more than necessary, and also not use up too much memory
# so the max buffer size is capped at 4MB
self._max_buffer_size = length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE \
else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
self._max_buffer_size = (
length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
)
self._current_buffer_start = 0
self._current_buffer_size = 0
super(SubStream, self).__init__()
Expand Down Expand Up @@ -393,7 +397,7 @@ def read(self, n):

# return fast
if n == 0 or self._buffer.closed:
return b''
return b""

# attempt first read from the read buffer and update position
read_buffer = self._buffer.read(n)
Expand Down Expand Up @@ -449,7 +453,7 @@ def seek(self, offset, whence=0):
start_index = self._position
elif whence is SEEK_END:
start_index = self._length
offset = - offset
offset = -offset
else:
raise ValueError("Invalid argument for the 'whence' parameter.")

Expand Down Expand Up @@ -492,10 +496,11 @@ class IterStreamer(object):
"""
File-like streaming iterator.
"""
def __init__(self, generator, encoding='UTF-8'):

def __init__(self, generator, encoding="UTF-8"):
self.generator = generator
self.iterator = iter(generator)
self.leftover = b''
self.leftover = b""
self.encoding = encoding

def __len__(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from azure.core import Configuration
from azure.core.pipeline import Pipeline
from azure.core.pipeline.transport import RequestsTransport
from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy
from azure.core.pipeline.policies import (
RedirectPolicy,
ContentDecodePolicy,
Expand Down Expand Up @@ -175,6 +176,7 @@ def _create_pipeline(self, credential, **kwargs):
config.retry_policy,
config.logging_policy,
StorageResponseHook(**kwargs),
DistributedTracingPolicy(),
]
return config, Pipeline(config.transport, policies=policies)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from azure.core.pipeline.transport import AioHttpTransport as AsyncTransport
except ImportError:
from azure.core.pipeline.transport import AsyncioRequestsTransport as AsyncTransport
from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy
from azure.core.pipeline.policies import (
ContentDecodePolicy,
BearerTokenCredentialPolicy,
Expand Down Expand Up @@ -81,5 +82,6 @@ def _create_pipeline(self, credential, **kwargs):
config.retry_policy,
config.logging_policy,
AsyncStorageResponseHook(**kwargs),
DistributedTracingPolicy(),
]
return config, AsyncPipeline(config.transport, policies=policies)
Loading

0 comments on commit e08bfc6

Please sign in to comment.