Skip to content

Commit

Permalink
Set expiry (Azure#12642)
Browse files Browse the repository at this point in the history
* [DataLake][SetExpiry]Set Expiry of DataLake File

* address comments

* use datalake set_expiry operation

* add serialize rfc1123 and fix pylint

* fix pylint

* remove return type
  • Loading branch information
xiafu-msft committed Oct 1, 2020
1 parent b115825 commit c733ae0
Show file tree
Hide file tree
Showing 22 changed files with 606 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ def _download_blob_options(self, offset=None, length=None, **kwargs):
'lease_access_conditions': access_conditions,
'modified_access_conditions': mod_conditions,
'cpk_info': cpk_info,
'cls': deserialize_blob_stream,
'cls': kwargs.pop('cls', None) or deserialize_blob_stream,
'max_concurrency':kwargs.pop('max_concurrency', 1),
'encoding': kwargs.pop('encoding', None),
'timeout': kwargs.pop('timeout', None),
Expand Down Expand Up @@ -1038,14 +1038,15 @@ def get_blob_properties(self, **kwargs):
snapshot=self.snapshot,
lease_access_conditions=access_conditions,
modified_access_conditions=mod_conditions,
cls=deserialize_blob_properties,
cls=kwargs.pop('cls', None) or deserialize_blob_properties,
cpk_info=cpk_info,
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
blob_props.name = self.blob_name
blob_props.snapshot = self.snapshot
blob_props.container = self.container_name
if isinstance(blob_props, BlobProperties):
blob_props.container = self.container_name
blob_props.snapshot = self.snapshot
return blob_props # type: ignore

def _set_http_headers_options(self, content_settings=None, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ async def set_expiry(self, expiry_options, timeout=None, request_id=None, expire
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id", request_id, 'str')
header_parameters['x-ms-expiry-option'] = self._serialize.header("expiry_options", expiry_options, 'str')
if expires_on is not None:
header_parameters['x-ms-expiry-time'] = self._serialize.header("expires_on", expires_on, 'str')
header_parameters['x-ms-expiry-time'] = self._serialize.header("expires_on", expires_on, 'rfc-1123')

# Construct and send request
request = self._client.put(url, query_parameters, header_parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ def set_expiry(self, expiry_options, timeout=None, request_id=None, expires_on=N
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id", request_id, 'str')
header_parameters['x-ms-expiry-option'] = self._serialize.header("expiry_options", expiry_options, 'str')
if expires_on is not None:
header_parameters['x-ms-expiry-time'] = self._serialize.header("expires_on", expires_on, 'str')
header_parameters['x-ms-expiry-time'] = self._serialize.header("expires_on", expires_on, 'rfc-1123')

# Construct and send request
request = self._client.put(url, query_parameters, header_parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,14 +561,15 @@ async def get_blob_properties(self, **kwargs):
snapshot=self.snapshot,
lease_access_conditions=access_conditions,
modified_access_conditions=mod_conditions,
cls=deserialize_blob_properties,
cls=kwargs.pop('cls', None) or deserialize_blob_properties,
cpk_info=cpk_info,
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
blob_props.name = self.blob_name
blob_props.snapshot = self.snapshot
blob_props.container = self.container_name
if isinstance(blob_props, BlobProperties):
blob_props.container = self.container_name
blob_props.snapshot = self.snapshot
return blob_props # type: ignore

@distributed_trace_async
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
DirectoryProperties,
FileProperties,
PathProperties,
PathPropertiesPaged,
LeaseProperties,
ContentSettings,
AccountSasPermissions,
Expand All @@ -38,6 +37,7 @@
AccessControlChangeFailure,
AccessControlChanges,
)

from ._shared_access_signature import generate_account_sas, generate_file_system_sas, generate_directory_sas, \
generate_file_sas

Expand Down Expand Up @@ -66,7 +66,6 @@
'DirectoryProperties',
'FileProperties',
'PathProperties',
'PathPropertiesPaged',
'LeaseProperties',
'ContentSettings',
'AccessControlChangeResult',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

try:
from urllib.parse import quote, unquote
except ImportError:
from urllib2 import quote, unquote # type: ignore

from ._deserialize import deserialize_dir_properties
from ._shared.base_client import parse_connection_str
from ._data_lake_file_client import DataLakeFileClient
from ._models import DirectoryProperties
Expand Down Expand Up @@ -236,8 +235,7 @@ def get_directory_properties(self, **kwargs):
:dedent: 4
:caption: Getting the properties for a file/directory.
"""
blob_properties = self._get_path_properties(**kwargs)
return DirectoryProperties._from_blob_properties(blob_properties) # pylint: disable=protected-access
return self._get_path_properties(cls=deserialize_dir_properties, **kwargs) # pylint: disable=protected-access

def rename_directory(self, new_name, # type: str
**kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
from ._generated.models import StorageErrorException
from ._download import StorageStreamDownloader
from ._path_client import PathClient
from ._serialize import get_mod_conditions, get_path_http_headers, get_access_conditions, add_metadata_headers
from ._deserialize import process_storage_error
from ._serialize import get_mod_conditions, get_path_http_headers, get_access_conditions, add_metadata_headers, \
convert_datetime_to_rfc1123
from ._deserialize import process_storage_error, deserialize_file_properties
from ._models import FileProperties, DataLakeFileQueryError


Expand Down Expand Up @@ -246,8 +247,31 @@ def get_file_properties(self, **kwargs):
:dedent: 4
:caption: Getting the properties for a file.
"""
blob_properties = self._get_path_properties(**kwargs)
return FileProperties._from_blob_properties(blob_properties) # pylint: disable=protected-access
return self._get_path_properties(cls=deserialize_file_properties, **kwargs) # pylint: disable=protected-access

def set_file_expiry(self, expiry_options, # type: str
expires_on=None, # type: Optional[Union[datetime, int]]
**kwargs):
# type: (str, Optional[Union[datetime, int]], **Any) -> None
"""Sets the time a file will expire and be deleted.
:param str expiry_options:
Required. Indicates mode of the expiry time.
Possible values include: 'NeverExpire', 'RelativeToCreation', 'RelativeToNow', 'Absolute'
:param datetime or int expires_on:
The time to set the file to expiry.
When expiry_options is RelativeTo*, expires_on should be an int in milliseconds.
If the type of expires_on is datetime, it should be in UTC time.
:keyword int timeout:
The timeout parameter is expressed in seconds.
:rtype: None
"""
try:
expires_on = convert_datetime_to_rfc1123(expires_on)
except AttributeError:
expires_on = str(expires_on)
self._datalake_client_for_blob_operation.path \
.set_expiry(expiry_options, expires_on=expires_on, **kwargs) # pylint: disable=protected-access

def _upload_options( # pylint:disable=too-many-statements
self, data, # type: Union[Iterable[AnyStr], IO[AnyStr]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from azure.core.pipeline.policies import ContentDecodePolicy
from azure.core.exceptions import HttpResponseError, DecodeError, ResourceModifiedError, ClientAuthenticationError, \
ResourceNotFoundError, ResourceExistsError
from ._models import FileProperties, DirectoryProperties, LeaseProperties
from ._shared.models import StorageErrorCode

if TYPE_CHECKING:
Expand All @@ -20,6 +21,45 @@
_LOGGER = logging.getLogger(__name__)


def deserialize_dir_properties(response, obj, headers):
metadata = deserialize_metadata(response, obj, headers)
dir_properties = DirectoryProperties(
metadata=metadata,
**headers
)
return dir_properties


def deserialize_file_properties(response, obj, headers):
metadata = deserialize_metadata(response, obj, headers)
file_properties = FileProperties(
metadata=metadata,
**headers
)
if 'Content-Range' in headers:
if 'x-ms-blob-content-md5' in headers:
file_properties.content_settings.content_md5 = headers['x-ms-blob-content-md5']
else:
file_properties.content_settings.content_md5 = None
return file_properties


def from_blob_properties(blob_properties):
file_props = FileProperties()
file_props.name = blob_properties.name
file_props.etag = blob_properties.etag
file_props.deleted = blob_properties.deleted
file_props.metadata = blob_properties.metadata
file_props.lease = blob_properties.lease
file_props.lease.__class__ = LeaseProperties
file_props.last_modified = blob_properties.last_modified
file_props.creation_time = blob_properties.creation_time
file_props.size = blob_properties.size
file_props.deleted_time = blob_properties.deleted_time
file_props.remaining_retention_days = blob_properties.remaining_retention_days
file_props.content_settings = blob_properties.content_settings
return file_props

def normalize_headers(headers):
normalized = {}
for key, value in headers.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from ._models import FileProperties
from ._deserialize import from_blob_properties


class StorageStreamDownloader(object):
Expand All @@ -23,7 +22,7 @@ class StorageStreamDownloader(object):
def __init__(self, downloader):
self._downloader = downloader
self.name = self._downloader.name
self.properties = FileProperties._from_blob_properties(self._downloader.properties) # pylint: disable=protected-access
self.properties = from_blob_properties(self._downloader.properties) # pylint: disable=protected-access
self.size = self._downloader.size

def __len__(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from azure.storage.blob import ContainerClient
from ._shared.base_client import StorageAccountHostsMixin, parse_query, parse_connection_str
from ._serialize import convert_dfs_url_to_blob_url
from ._models import LocationMode, FileSystemProperties, PathPropertiesPaged, PublicAccess
from ._models import LocationMode, FileSystemProperties, PublicAccess
from ._list_paths_helper import PathPropertiesPaged
from ._data_lake_file_client import DataLakeFileClient
from ._data_lake_directory_client import DataLakeDirectoryClient
from ._data_lake_lease import DataLakeLeaseClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from azure.core.paging import PageIterator
from ._generated.models import StorageErrorException
from ._models import PathProperties
from ._deserialize import return_headers_and_deserialized_path_list
from ._generated.models import Path
from ._shared.response_handlers import process_storage_error


class PathPropertiesPaged(PageIterator):
"""An Iterable of Path properties.
:ivar str path: Filters the results to return only paths under the specified path.
:ivar int results_per_page: The maximum number of results retrieved per API call.
:ivar str continuation_token: The continuation token to retrieve the next page of results.
:ivar list(~azure.storage.filedatalake.PathProperties) current_page: The current page of listed results.
:param callable command: Function to retrieve the next page of items.
:param str path: Filters the results to return only paths under the specified path.
:param int max_results: The maximum number of psths to retrieve per
call.
:param str continuation_token: An opaque continuation token.
"""
def __init__(
self, command,
recursive,
path=None,
max_results=None,
continuation_token=None,
upn=None):
super(PathPropertiesPaged, self).__init__(
get_next=self._get_next_cb,
extract_data=self._extract_data_cb,
continuation_token=continuation_token or ""
)
self._command = command
self.recursive = recursive
self.results_per_page = max_results
self.path = path
self.upn = upn
self.current_page = None
self.path_list = None

def _get_next_cb(self, continuation_token):
try:
return self._command(
self.recursive,
continuation=continuation_token or None,
path=self.path,
max_results=self.results_per_page,
upn=self.upn,
cls=return_headers_and_deserialized_path_list)
except StorageErrorException as error:
process_storage_error(error)

def _extract_data_cb(self, get_next_return):
self.path_list, self._response = get_next_return
self.current_page = [self._build_item(item) for item in self.path_list]

return self._response['continuation'] or None, self.current_page

@staticmethod
def _build_item(item):
if isinstance(item, PathProperties):
return item
if isinstance(item, Path):
path = PathProperties._from_generated(item) # pylint: disable=protected-access
return path
return item
Loading

0 comments on commit c733ae0

Please sign in to comment.