Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download sparse blob #7555

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from .blob_service_client import BlobServiceClient
from .lease import LeaseClient
from ._shared.policies import ExponentialRetry, LinearRetry, NoRetry
from ._shared.downloads import StorageStreamDownloader
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we want to expose it to user I will add it back in the next commit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this probably should be exposed - it is the return type of the download operations and if users what to do Type-checking they will need to be able to import this type.
I also think it it's exposed in the Files SDK, we whichever way we go should be consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't exposed in File I think, but I assumed we want to expose it

from ._shared.models import(
LocationMode,
ResourceTypes,
Expand Down Expand Up @@ -86,7 +85,6 @@
'BlobPermissions',
'ResourceTypes',
'AccountPermissions',
'StorageStreamDownloader',
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@ def _str(value):

def _to_utc_datetime(value):
return value.strftime('%Y-%m-%dT%H:%M:%SZ')


def get_empty_chunk(chunk_size):
xiafu-msft marked this conversation as resolved.
Show resolved Hide resolved
empty_chunk = b''
for i in range(0, chunk_size): # pylint:disable=unused-variable
xiafu-msft marked this conversation as resolved.
Show resolved Hide resolved
empty_chunk += b'\x00'
xiafu-msft marked this conversation as resolved.
Show resolved Hide resolved
return empty_chunk
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,14 @@ def parse_to_internal_user_delegation_key(service_user_delegation_key):
internal_user_delegation_key.signed_version = service_user_delegation_key.signed_version
internal_user_delegation_key.value = service_user_delegation_key.value
return internal_user_delegation_key


def get_page_ranges_result(ranges):
# type: (PageList) -> Tuple(List[Dict[str, int]], List[Dict[str, int]])
page_range = [] # type: ignore
clear_range = [] # type: List
if ranges.page_range:
page_range = [{'start': b.start, 'end': b.end} for b in ranges.page_range] # type: ignore
if ranges.clear_range:
clear_range = [{'start': b.start, 'end': b.end} for b in ranges.clear_range]
return page_range, clear_range # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
# --------------------------------------------------------------------------

from .._shared.policies_async import ExponentialRetry, LinearRetry, NoRetry
from .._shared.downloads_async import StorageStreamDownloader
from .._shared.models import(
LocationMode,
ResourceTypes,
Expand Down Expand Up @@ -82,5 +81,4 @@
'BlobPermissions',
'ResourceTypes',
'AccountPermissions',
'StorageStreamDownloader',
]
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

from .._shared.base_client_async import AsyncStorageAccountHostsMixin
from .._shared.policies_async import ExponentialRetry
from .._shared.downloads_async import StorageStreamDownloader
from .._shared.response_handlers import return_response_headers, process_storage_error
from .._shared.response_handlers import return_response_headers, process_storage_error, get_page_ranges_result
from .._generated.aio import AzureBlobStorage
from .._generated.models import ModifiedAccessConditions, StorageErrorException
from .._deserialize import deserialize_blob_properties
Expand All @@ -28,6 +27,7 @@
from ..models import BlobType, BlobBlock
from ..lease import get_access_conditions
from .lease_async import LeaseClient
from .download_async import StorageStreamDownloader

if TYPE_CHECKING:
from datetime import datetime
Expand Down Expand Up @@ -1377,7 +1377,7 @@ async def get_page_ranges( # type: ignore
ranges = await self._client.page_blob.get_page_ranges(**options)
except StorageErrorException as error:
process_storage_error(error)
return self._get_page_ranges_result(ranges)
return get_page_ranges_result(ranges)

@distributed_trace_async
async def set_sequence_number( # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
# license information.
# --------------------------------------------------------------------------

import sys
import asyncio
import sys
from io import BytesIO
from itertools import islice

from azure.core.exceptions import HttpResponseError

from .request_handlers import validate_and_format_range_headers
from .response_handlers import process_storage_error, parse_length_from_content_range
from .encryption import decrypt_blob
from .downloads import process_range_and_offset
from azure.core import HttpResponseError
from .._shared.encryption import decrypt_blob
from .._shared.parser import get_empty_chunk
from .._shared.request_handlers import validate_and_format_range_headers
from .._shared.response_handlers import process_storage_error, parse_length_from_content_range, \
get_page_ranges_result
from ..download import process_range_and_offset


async def process_content(data, start_offset, end_offset, encryption):
Expand Down Expand Up @@ -42,7 +43,8 @@ async def process_content(data, start_offset, end_offset, encryption):
class _AsyncChunkDownloader(object): # pylint: disable=too-many-instance-attributes

def __init__(
self, service=None,
self, client=None,
non_empty_ranges=None,
total_size=None,
chunk_size=None,
current_progress=None,
Expand All @@ -54,7 +56,9 @@ def __init__(
encryption_options=None,
**kwargs):

self.service = service
self.client = client

self.non_empty_ranges = non_empty_ranges

# information on the download range/chunk size
self.chunk_size = chunk_size
Expand Down Expand Up @@ -121,34 +125,52 @@ async def _write_to_stream(self, chunk_data, chunk_start):
else:
self.stream.write(chunk_data)

def _do_optimize(self, given_range_start, given_range_end):
if self.non_empty_ranges is None:
return False

for source_range in self.non_empty_ranges:
if given_range_end < source_range['start']: # pylint:disable=no-else-return
return True
elif source_range['end'] < given_range_start:
pass
else:
return False

return True

async def _download_chunk(self, chunk_start, chunk_end):
download_range, offset = process_range_and_offset(
chunk_start, chunk_end, chunk_end, self.encryption_options)
range_header, range_validation = validate_and_format_range_headers(
download_range[0],
download_range[1] - 1,
check_content_md5=self.validate_content)

try:
_, response = await self.service.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options)
except HttpResponseError as error:
process_storage_error(error)
if self._do_optimize(download_range[0], download_range[1] - 1):
chunk_data = get_empty_chunk(self.chunk_size)
else:
range_header, range_validation = validate_and_format_range_headers(
download_range[0],
download_range[1] - 1,
check_content_md5=self.validate_content)
try:
_, response = await self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options)
except HttpResponseError as error:
process_storage_error(error)

chunk_data = await process_content(response, offset[0], offset[1], self.encryption_options)
chunk_data = await process_content(response, offset[0], offset[1], self.encryption_options)

# This makes sure that if_match is set so that we can validate
# that subsequent downloads are to an unmodified blob
if self.request_options.get('modified_access_conditions'):
self.request_options['modified_access_conditions'].if_match = response.properties.etag
# This makes sure that if_match is set so that we can validate
# that subsequent downloads are to an unmodified blob
if self.request_options.get('modified_access_conditions'):
self.request_options['modified_access_conditions'].if_match = response.properties.etag

return chunk_data


class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attributes
"""A streaming object to download from Azure Storage.

Expand All @@ -157,14 +179,16 @@ class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attr
"""

def __init__(
self, service=None,
self, client=None,
Copy link
Contributor Author

@xiafu-msft xiafu-msft Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are fine with seperating download.py for file and blob, we can remove client from constructor and only keep clients. (Currenly client is used to perform download in blob_operations, clients is used to get page_blob client, and perform get_page_ranges operation.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we should be some refactoring here - having both client and clients seems odd.
Though if it's not public facing it can be refactored at a later date.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than have a public attribute called clients - could we possibly rename this to something like generated_api and make it private?
That way if we refactor or rename it later, it's already private and we can do what we like with it. I also feel the name clients is confusing.

def __init__(client=None, generated_api=None, ..., **kwargs):
    self.client = client
    self._generated_api = generated_api

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just keep clients and remove client?

clients=None,
config=None,
offset=None,
length=None,
validate_content=None,
encryption_options=None,
**kwargs):
self.service = service
self.client = client
self.clients = clients
self.config = config
self.offset = offset
self.length = length
Expand Down Expand Up @@ -192,6 +216,7 @@ def __init__(
initial_request_start, initial_request_end, self.length, self.encryption_options)
self.download_size = None
self.file_size = None
self.non_empty_ranges = None
self.response = None
self.properties = None

Expand All @@ -218,7 +243,8 @@ async def __anext__(self):
# Use the length unless it is over the end of the file
data_end = min(self.file_size, self.length + 1)
self._iter_downloader = _AsyncChunkDownloader(
service=self.service,
client=self.client,
non_empty_ranges=self.non_empty_ranges,
total_size=self.download_size,
chunk_size=self.config.max_chunk_get_size,
current_progress=self.first_get_size,
Expand Down Expand Up @@ -274,7 +300,7 @@ async def _initial_request(self):
check_content_md5=self.validate_content)

try:
location_mode, response = await self.service.download(
location_mode, response = await self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
Expand Down Expand Up @@ -303,7 +329,7 @@ async def _initial_request(self):
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = await self.service.download(
_, response = await self.client.download(
validate_content=self.validate_content,
data_stream_total=0,
download_stream_current=0,
Expand All @@ -317,6 +343,14 @@ async def _initial_request(self):
else:
process_storage_error(error)

# get page ranges to optimize downloading sparse page blob
if response.properties.blob_type == 'PageBlob':
try:
page_ranges = await self.clients.page_blob.get_page_ranges()
self.non_empty_ranges = get_page_ranges_result(page_ranges)[0]
except HttpResponseError:
pass

# If the file is small, the download is complete at this point.
# If file size is large, download the rest of the file in chunks.
if response.properties.size != self.download_size:
Expand All @@ -328,32 +362,32 @@ async def _initial_request(self):
self._download_complete = True
return response

async def content_as_bytes(self, max_connections=1):
async def content_as_bytes(self, max_concurrency=1):
"""Download the contents of this file.

This operation is blocking until all data is downloaded.

:param int max_connections:
:param int max_concurrency:
The number of parallel connections with which to download.
:rtype: bytes
"""
stream = BytesIO()
await self.download_to_stream(stream, max_connections=max_connections)
await self.download_to_stream(stream, max_concurrency=max_concurrency)
return stream.getvalue()

async def content_as_text(self, max_connections=1, encoding='UTF-8'):
async def content_as_text(self, max_concurrency=1, encoding='UTF-8'):
"""Download the contents of this file, and decode as text.

This operation is blocking until all data is downloaded.

:param int max_connections:
:param int max_concurrency:
The number of parallel connections with which to download.
:rtype: str
"""
content = await self.content_as_bytes(max_connections=max_connections)
content = await self.content_as_bytes(max_concurrency=max_concurrency)
return content.decode(encoding)

async def download_to_stream(self, stream, max_connections=1):
async def download_to_stream(self, stream, max_concurrency=1):
"""Download the contents of this file to a stream.

:param stream:
Expand All @@ -367,7 +401,7 @@ async def download_to_stream(self, stream, max_connections=1):
raise ValueError("Stream is currently being iterated.")

# the stream must be seekable if parallel download is required
parallel = max_connections > 1
parallel = max_concurrency > 1
if parallel:
error_message = "Target stream handle must be seekable."
if sys.version_info >= (3,) and not stream.seekable():
Expand Down Expand Up @@ -396,7 +430,8 @@ async def download_to_stream(self, stream, max_connections=1):
data_end = min(self.file_size, self.length + 1)

downloader = _AsyncChunkDownloader(
service=self.service,
client=self.client,
non_empty_ranges=self.non_empty_ranges,
total_size=self.download_size,
chunk_size=self.config.max_chunk_get_size,
current_progress=self.first_get_size,
Expand All @@ -412,7 +447,7 @@ async def download_to_stream(self, stream, max_connections=1):
dl_tasks = downloader.get_chunk_offsets()
running_futures = [
asyncio.ensure_future(downloader.process_chunk(d))
for d in islice(dl_tasks, 0, max_connections)
for d in islice(dl_tasks, 0, max_concurrency)
]
while running_futures:
# Wait for some download to finish before adding a new one
Expand Down
Loading