Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[draft] working download #38953

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ async def is_checksum_retry(response):
# retry if invalid content md5
if response.context.get('validate_content', False) and response.http_response.headers.get('content-md5'):
try:
await response.http_response.read() # Load the body in memory and close the socket
# Fix Start
await response.http_response.load_body() # Load the body in memory and close the socket
# Fix End
except (StreamClosedError, StreamConsumedError):
pass
computed_md5 = response.http_request.headers.get('content-md5', None) or \
encode_base64(StorageContentValidation.get_content_md5(response.http_response.content))
encode_base64(StorageContentValidation.get_content_md5(response.http_response.body()))
if response.http_response.headers['content-md5'] != computed_md5:
return True
return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,13 @@
async def process_content(data: Any, start_offset: int, end_offset: int, encryption: Dict[str, Any]) -> bytes:
if data is None:
raise ValueError("Response cannot be None.")
await data.response.read()
content = cast(bytes, data.response.content)
# Fix Start
await data.response.load_body() # Load the body in memory and close the socket
# Fix End

# This is where the content is received. Place a breakpoint on the line below.

content = cast(bytes, data.response.body())
if encryption.get('key') is not None or encryption.get('resolver') is not None:
try:
return decrypt_blob(
Expand Down
28 changes: 26 additions & 2 deletions sdk/storage/azure-storage-blob/tests/test_common_blob_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from devtools_testutils.aio import recorded_by_proxy_async
from devtools_testutils.storage.aio import AsyncStorageRecordedTestCase
from settings.testcase import BlobPreparer
from test_helpers_async import AsyncStream
from test_helpers_async import AsyncStream, MockStorageTransport

# ------------------------------------------------------------------------------
TEST_CONTAINER_PREFIX = 'container'
Expand Down Expand Up @@ -187,7 +187,7 @@ async def test_start_copy_from_url_with_oauth(self, **kwargs):

@BlobPreparer()
@recorded_by_proxy_async
async def test_blob_exists(self, **kwargs):
async def test_blob_existseeeeaaaaaaa(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")

Expand All @@ -202,6 +202,30 @@ async def test_blob_exists(self, **kwargs):
# Assert
assert exists

@pytest.mark.live_test_only
@BlobPreparer()
async def test_mock_transport(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")

transport = MockStorageTransport()
blob_service_client = BlobServiceClient(
self.account_url(storage_account_name, "blob"),
credential=storage_account_key,
transport=transport,
retry_total=0
)

blob_client = BlobClient(
blob_service_client.url, container_name='test_cont', blob_name='test_blob', credential=storage_account_key,
transport=transport, retry_total=0)

# content = await blob_client.download_blob()
# assert content is not None

props = await blob_client.get_blob_properties()
assert props is not None

@BlobPreparer()
@recorded_by_proxy_async
async def test_blob_exists_with_if_tags(self, **kwargs):
Expand Down
81 changes: 80 additions & 1 deletion sdk/storage/azure-storage-blob/tests/test_helpers_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
# license information.
# --------------------------------------------------------------------------
from io import IOBase, UnsupportedOperation
from typing import Optional
from typing import Any, Dict, Optional

from azure.core.pipeline.transport import AsyncHttpTransport, AioHttpTransportResponse
from azure.core.rest import AsyncHttpResponse, HttpRequest
from azure.core.rest._aiohttp import RestAioHttpTransportResponse
from aiohttp import ClientResponse



class ProgressTracker:
Expand Down Expand Up @@ -60,3 +66,76 @@ async def read(self, size: int = -1) -> bytes:
self._offset += len(data)

return data

class MockAioHttpClientResponse(ClientResponse):
def __init__(self, url: str, body_bytes: bytes, headers: Optional[Dict[str, Any]] = None):
self._url = url
self._body = body_bytes
self._headers = headers
self._cache = {}
self._loop = None
self.status = 200
self.reason = "OK"

class MockStorageTransport(AsyncHttpTransport):
async def send(self, request: HttpRequest, **kwargs: Any) -> AioHttpTransportResponse:
if request.method == 'GET':
# download blob
return AioHttpTransportResponse(
request,
MockAioHttpClientResponse(
request.url,
b"test content",
{
"Content-Type": "application/octet-stream",
"Content-Range": "bytes 0-27/28",
"Content-Length": "28",
},
),
)
elif request.method == 'HEAD':
# get blob properties
core_response = RestAioHttpTransportResponse(
request=request,
internal_response=MockAioHttpClientResponse(
request.url,
b"",
{
"Content-Type": "application/octet-stream",
"Content-Length": "1024",
"Content-MD5": "yaNM/IXZgmmMasifdgcavQ=="
},
),
decompress=False
)
# resp = AioHttpTransportResponse(
# request,
# MockAioHttpClientResponse(
# request.url,
# b"",
# {
# "Content-Type": "application/octet-stream",
# "Content-Length": "1024",
# "Content-MD5": "yaNM/IXZgmmMasifdgcavQ=="
# },
# ),
# )
# await resp.read()

# Emulate the logic that would call into read()
await core_response.read()
return core_response

return None

async def __aenter__(self):
return self

async def __aexit__(self, *args):
pass

async def open(self):
pass

async def close(self):
pass
Loading