Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage File Service #425

Merged
merged 2 commits into from
Jul 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions azure-storage/azure/storage/_common_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,6 @@ def _get_request_body_bytes_only(param_name, param_value):
if isinstance(param_value, bytes):
return param_value

# Previous versions of the SDK allowed data types other than bytes to be
# passed in, and they would be auto-converted to bytes. We preserve this
# behavior when running under 2.7, but issue a warning.
# Python 3 support is new, so we reject anything that's not bytes.
if sys.version_info < (3,):
warnings.warn(_WARNING_VALUE_SHOULD_BE_BYTES.format(param_name))
return _get_request_body(param_value)

raise TypeError(_ERROR_VALUE_SHOULD_BE_BYTES.format(param_name))


Expand Down
18 changes: 6 additions & 12 deletions azure-storage/azure/storage/blob/blobservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class BlobService(_StorageClient):
This is the main class managing Blob resources.
'''

_BLOB_MAX_DATA_SIZE = 64 * 1024 * 1024
_BLOB_MAX_CHUNK_DATA_SIZE = 4 * 1024 * 1024

def __init__(self, account_name=None, account_key=None, protocol='https',
host_base=BLOB_SERVICE_HOST_BASE, dev_host=DEV_BLOB_HOST,
timeout=DEFAULT_HTTP_TIMEOUT, sas_token=None, connection_string=None,
Expand Down Expand Up @@ -141,8 +144,6 @@ def __init__(self, account_name=None, account_key=None, protocol='https',
protocol = connection_params.protocol.lower()
host_base = connection_params.host_base_blob

self._BLOB_MAX_DATA_SIZE = 64 * 1024 * 1024
self._BLOB_MAX_CHUNK_DATA_SIZE = 4 * 1024 * 1024
super(BlobService, self).__init__(
account_name, account_key, protocol, host_base, dev_host, timeout, sas_token, request_session)

Expand Down Expand Up @@ -179,17 +180,10 @@ def make_blob_url(self, container_name, blob_name, account_name=None,
generate_shared_access_signature.
'''

if not account_name:
account_name = self.account_name
if not protocol:
protocol = self.protocol
if not host_base:
host_base = self.host_base

url = '{0}://{1}{2}/{3}/{4}'.format(
protocol,
account_name,
host_base,
protocol or self.protocol,
account_name or self.account_name,
host_base or self.host_base,
container_name,
blob_name,
)
Expand Down
6 changes: 5 additions & 1 deletion azure-storage/azure/storage/cloudstorageaccount.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
class CloudStorageAccount(object):

"""
Provides a factory for creating the blob, queue, and table services
Provides a factory for creating the blob, queue, table, and file services
with a common account name and account key. Users can either use the
factory or can construct the appropriate service directly.
"""
Expand All @@ -41,3 +41,7 @@ def create_table_service(self):
def create_queue_service(self):
from .queue.queueservice import QueueService
return QueueService(self.account_name, self.account_key)

def create_file_service(self):
from .file.fileservice import FileService
return FileService(self.account_name, self.account_key)
3 changes: 3 additions & 0 deletions azure-storage/azure/storage/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
BLOB_SERVICE_HOST_BASE,
TABLE_SERVICE_HOST_BASE,
QUEUE_SERVICE_HOST_BASE,
FILE_SERVICE_HOST_BASE
)


Expand All @@ -43,3 +44,5 @@ def __init__(self, connection_string = ''):
else ".table.{}".format(endpoint_suffix)
self.host_base_queue = QUEUE_SERVICE_HOST_BASE if endpoint_suffix is None \
else ".queue.{}".format(endpoint_suffix)
self.host_base_file = FILE_SERVICE_HOST_BASE if endpoint_suffix is None \
else ".file.{}".format(endpoint_suffix)
2 changes: 2 additions & 0 deletions azure-storage/azure/storage/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
BLOB_SERVICE_HOST_BASE = '.blob.core.windows.net'
QUEUE_SERVICE_HOST_BASE = '.queue.core.windows.net'
TABLE_SERVICE_HOST_BASE = '.table.core.windows.net'
FILE_SERVICE_HOST_BASE = '.file.core.windows.net'

# Development ServiceClient URLs
DEV_BLOB_HOST = '127.0.0.1:10000'
DEV_QUEUE_HOST = '127.0.0.1:10001'
DEV_TABLE_HOST = '127.0.0.1:10002'
DEV_FILE_HOST = '127.0.0.1:10003'

# Default credentials for Development Storage Service
DEV_ACCOUNT_NAME = 'devstoreaccount1'
Expand Down
33 changes: 33 additions & 0 deletions azure-storage/azure/storage/file/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#-------------------------------------------------------------------------
# Copyright (c) Microsoft. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#--------------------------------------------------------------------------
from ..constants import (
FILE_SERVICE_HOST_BASE,
DEV_FILE_HOST,
)

from .models import (
ShareEnumResults,
Share,
Properties,
FileAndDirectoryEnumResults,
FileResult,
File,
FileProperties,
Directory,
Range,
RangeList,
)

from .fileservice import FileService
255 changes: 255 additions & 0 deletions azure-storage/azure/storage/file/_chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
#-------------------------------------------------------------------------
# Copyright (c) Microsoft. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#--------------------------------------------------------------------------
import sys
import threading

from time import sleep
from .._common_conversion import _encode_base64
from .._common_serialization import url_quote


class _FileChunkDownloader(object):
def __init__(self, file_service, share_name, directory_name, file_name,
file_size, chunk_size, stream, parallel, max_retries, retry_wait,
progress_callback):
self.file_service = file_service
self.share_name = share_name
self.directory_name = directory_name
self.file_name = file_name
self.file_size = file_size
self.chunk_size = chunk_size
self.stream = stream
self.stream_start = stream.tell() if parallel else None
self.stream_lock = threading.Lock() if parallel else None
self.progress_callback = progress_callback
self.progress_total = 0
self.progress_lock = threading.Lock() if parallel else None
self.max_retries = max_retries
self.retry_wait = retry_wait

def get_chunk_offsets(self):
index = 0
while index < self.file_size:
yield index
index += self.chunk_size

def process_chunk(self, chunk_offset):
chunk_data = self._download_chunk_with_retries(chunk_offset)
length = len(chunk_data)
if length > 0:
self._write_to_stream(chunk_data, chunk_offset)
self._update_progress(length)

def _update_progress(self, length):
if self.progress_callback is not None:
if self.progress_lock is not None:
with self.progress_lock:
self.progress_total += length
total = self.progress_total
else:
self.progress_total += length
total = self.progress_total
self.progress_callback(total, self.file_size)

def _write_to_stream(self, chunk_data, chunk_offset):
if self.stream_lock is not None:
with self.stream_lock:
self.stream.seek(self.stream_start + chunk_offset)
self.stream.write(chunk_data)
else:
self.stream.write(chunk_data)

def _download_chunk_with_retries(self, chunk_offset):
range_id = 'bytes={0}-{1}'.format(chunk_offset, chunk_offset + self.chunk_size - 1)
retries = self.max_retries
while True:
try:
return self.file_service.get_file(
self.share_name,
self.directory_name,
self.file_name,
x_ms_range=range_id
)
except Exception:
if retries > 0:
retries -= 1
sleep(self.retry_wait)
else:
raise


class _FileChunkUploader(object):
def __init__(self, file_service, share_name, directory_name, file_name,
file_size, chunk_size, stream, parallel, max_retries, retry_wait,
progress_callback):
self.file_service = file_service
self.share_name = share_name
self.directory_name = directory_name
self.file_name = file_name
self.file_size = file_size
self.chunk_size = chunk_size
self.stream = stream
self.stream_start = stream.tell() if parallel else None
self.stream_lock = threading.Lock() if parallel else None
self.progress_callback = progress_callback
self.progress_total = 0
self.progress_lock = threading.Lock() if parallel else None
self.max_retries = max_retries
self.retry_wait = retry_wait

def get_chunk_offsets(self):
index = 0
if self.file_size is None:
# we don't know the size of the stream, so we have no
# choice but to seek
while True:
data = self._read_from_stream(index, 1)
if not data:
break
yield index
index += self.chunk_size
else:
while index < self.file_size:
yield index
index += self.chunk_size

def process_chunk(self, chunk_offset):
size = self.chunk_size
if self.file_size is not None:
size = min(size, self.file_size - chunk_offset)
chunk_data = self._read_from_stream(chunk_offset, size)
return self._upload_chunk_with_retries(chunk_offset, chunk_data)

def process_all_unknown_size(self):
assert self.stream_lock is None
range_ids = []
index = 0
while True:
data = self._read_from_stream(None, self.chunk_size)
if data:
index += len(data)
range_id = self._upload_chunk_with_retries(index, data)
range_ids.append(range_id)
else:
break

return range_ids

def _read_from_stream(self, offset, count):
if self.stream_lock is not None:
with self.stream_lock:
self.stream.seek(self.stream_start + offset)
data = self.stream.read(count)
else:
data = self.stream.read(count)
return data

def _update_progress(self, length):
if self.progress_callback is not None:
if self.progress_lock is not None:
with self.progress_lock:
self.progress_total += length
total = self.progress_total
else:
self.progress_total += length
total = self.progress_total
self.progress_callback(total, self.file_size)

def _upload_chunk_with_retries(self, chunk_offset, chunk_data):
retries = self.max_retries
while True:
try:
range_id = self._upload_chunk(chunk_offset, chunk_data)
self._update_progress(len(chunk_data))
return range_id
except Exception:
if retries > 0:
retries -= 1
sleep(self.retry_wait)
else:
raise

def _upload_chunk(self, chunk_offset, chunk_data):
range_id = 'bytes={0}-{1}'.format(chunk_offset, chunk_offset + len(chunk_data) - 1)
self.file_service.update_range(
self.share_name,
self.directory_name,
self.file_name,
chunk_data,
range_id,
)
return range_id


def _download_file_chunks(file_service, share_name, directory_name, file_name,
file_size, block_size, stream, max_connections,
max_retries, retry_wait, progress_callback):
downloader = _FileChunkDownloader(
file_service,
share_name,
directory_name,
file_name,
file_size,
block_size,
stream,
max_connections > 1,
max_retries,
retry_wait,
progress_callback,
)

if progress_callback is not None:
progress_callback(0, file_size)

if max_connections > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
result = list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
else:
for range_start in downloader.get_chunk_offsets():
downloader.process_chunk(range_start)


def _upload_file_chunks(file_service, share_name, directory_name, file_name,
file_size, block_size, stream, max_connections,
max_retries, retry_wait, progress_callback):
uploader = _FileChunkUploader(
file_service,
share_name,
directory_name,
file_name,
file_size,
block_size,
stream,
max_connections > 1,
max_retries,
retry_wait,
progress_callback,
)

if progress_callback is not None:
progress_callback(0, file_size)

if max_connections > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
range_ids = list(executor.map(uploader.process_chunk, uploader.get_chunk_offsets()))
else:
if file_size is not None:
range_ids = [uploader.process_chunk(start) for start in uploader.get_chunk_offsets()]
else:
range_ids = uploader.process_all_unknown_size()

return range_ids
Loading