Skip to content

Commit

Permalink
[Storage]fix retry on large block upload (#17909)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiafu-msft authored Apr 20, 2021
1 parent 68095e0 commit 390e17c
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ def read(self, size=None):
raise IOError("Stream failed to seek to the desired location.")
buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size)
else:
absolute_position = self._stream_begin_index + self._position
# It's possible that there's connection problem during data transfer,
# so when we retry we don't want to read from current position of wrapped stream,
# instead we should seek to where we want to read from.
if self._wrapped_stream.tell() != absolute_position:
self._wrapped_stream.seek(absolute_position, SEEK_SET)

buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size)

if buffer_from_stream:
Expand Down
27 changes: 27 additions & 0 deletions sdk/storage/azure-storage-blob/tests/test_largest_block_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from _shared.testcase import StorageTestCase, GlobalStorageAccountPreparer

# ------------------------------------------------------------------------------
from azure.storage.blob._shared.uploads import SubStream

TEST_BLOB_PREFIX = 'largestblob'
LARGEST_BLOCK_SIZE = 4000 * 1024 * 1024
LARGEST_SINGLE_UPLOAD_SIZE = 5000 * 1024 * 1024
Expand Down Expand Up @@ -207,6 +209,31 @@ def test_create_largest_blob_from_path(self, resource_group, location, storage_a
# Assert
self._teardown(FILE_PATH)

def test_substream_for_single_thread_upload_large_block(self):
FILE_PATH = 'largest_blob_from_path.temp.{}.dat'.format(str(uuid.uuid4()))
with open(FILE_PATH, 'wb') as stream:
largeStream = LargeStream(LARGEST_BLOCK_SIZE, 100 * 1024 * 1024)
chunk = largeStream.read()
while chunk:
stream.write(chunk)
chunk = largeStream.read()

with open(FILE_PATH, 'rb') as stream:
substream = SubStream(stream, 0, 2 * 1024 * 1024, None)
# this is to mimic stage large block: SubStream.read() is getting called by http client
data1 = substream.read(2 * 1024 * 1024)
substream.read(2 * 1024 * 1024)
substream.read(2 * 1024 * 1024)

# this is to mimic rewinding request body after connection error
substream.seek(0)

# this is to mimic retry: stage that large block from beginning
data2 = substream.read(2 * 1024 * 1024)

self.assertEqual(data1, data2)
self._teardown(FILE_PATH)

@pytest.mark.live_test_only
@GlobalStorageAccountPreparer()
def test_create_largest_blob_from_path_without_network(self, resource_group, location, storage_account, storage_account_key):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ def read(self, size=None):
raise IOError("Stream failed to seek to the desired location.")
buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size)
else:
absolute_position = self._stream_begin_index + self._position
# It's possible that there's connection problem during data transfer,
# so when we retry we don't want to read from current position of wrapped stream,
# instead we should seek to where we want to read from.
if self._wrapped_stream.tell() != absolute_position:
self._wrapped_stream.seek(absolute_position, SEEK_SET)

buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size)

if buffer_from_stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ def read(self, size=None):
raise IOError("Stream failed to seek to the desired location.")
buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size)
else:
absolute_position = self._stream_begin_index + self._position
# It's possible that there's connection problem during data transfer,
# so when we retry we don't want to read from current position of wrapped stream,
# instead we should seek to where we want to read from.
if self._wrapped_stream.tell() != absolute_position:
self._wrapped_stream.seek(absolute_position, SEEK_SET)

buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size)

if buffer_from_stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ def read(self, size=None):
raise IOError("Stream failed to seek to the desired location.")
buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size)
else:
absolute_position = self._stream_begin_index + self._position
# It's possible that there's connection problem during data transfer,
# so when we retry we don't want to read from current position of wrapped stream,
# instead we should seek to where we want to read from.
if self._wrapped_stream.tell() != absolute_position:
self._wrapped_stream.seek(absolute_position, SEEK_SET)

buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size)

if buffer_from_stream:
Expand Down

0 comments on commit 390e17c

Please sign in to comment.