Skip to content

Commit

Permalink
[Datalake]Batch delete files or empty directories (#21269)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiafu-msft authored Oct 14, 2021
1 parent 4691c52 commit bfd3e81
Show file tree
Hide file tree
Showing 11 changed files with 3,126 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# license information.
# --------------------------------------------------------------------------
import functools
from typing import Optional, Any, Union
from typing import Optional, Any, Union, Iterator


try:
Expand All @@ -14,6 +14,7 @@
from urllib2 import quote, unquote # type: ignore
import six

from azure.core.pipeline.transport import HttpResponse
from azure.core.pipeline import Pipeline
from azure.core.exceptions import HttpResponseError
from azure.core.paging import ItemPaged
Expand Down Expand Up @@ -809,6 +810,71 @@ def _get_root_directory_client(self):
"""
return self.get_directory_client('/')

def delete_files(self, *files, **kwargs):
# type: (...) -> Iterator[HttpResponse]
"""Marks the specified files or empty directories for deletion.
The files/empty directories are later deleted during garbage collection.
If a delete retention policy is enabled for the service, then this operation soft deletes the
files/empty directories and retains the files or snapshots for specified number of days.
After specified number of days, files' data is removed from the service during garbage collection.
Soft deleted files/empty directories are accessible through :func:`list_deleted_paths()`.
:param files:
The files/empty directories to delete. This can be a single file/empty directory, or multiple values can
be supplied, where each value is either the name of the file/directory (str) or
FileProperties/DirectoryProperties.
.. note::
When the file/dir type is dict, here's a list of keys, value rules.
blob name:
key: 'name', value type: str
if the file modified or not:
key: 'if_modified_since', 'if_unmodified_since', value type: datetime
etag:
key: 'etag', value type: str
match the etag or not:
key: 'match_condition', value type: MatchConditions
lease:
key: 'lease_id', value type: Union[str, LeaseClient]
timeout for subrequest:
key: 'timeout', value type: int
:type files: list[str], list[dict],
or list[Union[~azure.storage.filedatalake.FileProperties, ~azure.storage.filedatalake.DirectoryProperties]
:keyword ~datetime.datetime if_modified_since:
A DateTime value. Azure expects the date value passed in to be UTC.
If timezone is included, any non-UTC datetimes will be converted to UTC.
If a date is passed in without timezone info, it is assumed to be UTC.
Specify this header to perform the operation only
if the resource has been modified since the specified time.
:keyword ~datetime.datetime if_unmodified_since:
A DateTime value. Azure expects the date value passed in to be UTC.
If timezone is included, any non-UTC datetimes will be converted to UTC.
If a date is passed in without timezone info, it is assumed to be UTC.
Specify this header to perform the operation only if
the resource has not been modified since the specified date/time.
:keyword bool raise_on_any_failure:
This is a boolean param which defaults to True. When this is set, an exception
is raised even if there is a single operation failure.
:keyword int timeout:
The timeout parameter is expressed in seconds.
:return: An iterator of responses, one for each blob in order
:rtype: Iterator[~azure.core.pipeline.transport.HttpResponse]
.. admonition:: Example:
.. literalinclude:: ../samples/datalake_samples_file_system_async.py
:start-after: [START batch_delete_files_or_empty_directories]
:end-before: [END batch_delete_files_or_empty_directories]
:language: python
:dedent: 4
:caption: Deleting multiple files or empty directories.
"""
return self._container_client.delete_blobs(*files, **kwargs)

def get_directory_client(self, directory # type: Union[DirectoryProperties, str]
):
# type: (...) -> DataLakeDirectoryClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def __init__(self, **kwargs):
self.content_settings = ContentSettings(**kwargs)


class PathProperties(object):
class PathProperties(DictMixin):
"""Path properties listed by get_paths api.
:ivar str name: the full path for a file or directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
# pylint: disable=invalid-overridden-method
import functools
from typing import ( # pylint: disable=unused-import
Union, Optional, Any, Dict, TYPE_CHECKING
)
Union, Optional, Any, Dict, TYPE_CHECKING,
AsyncIterator)

from azure.core.pipeline.transport import AsyncHttpResponse

from azure.core.exceptions import HttpResponseError
from azure.core.tracing.decorator import distributed_trace
Expand Down Expand Up @@ -714,6 +716,72 @@ async def delete_file(self, file, # type: Union[FileProperties, str]
await file_client.delete_file(**kwargs)
return file_client

@distributed_trace_async
async def delete_files(self, *files, **kwargs):
# type: (...) -> AsyncIterator[AsyncHttpResponse]
"""Marks the specified files or empty directories for deletion.
The files/empty directories are later deleted during garbage collection.
If a delete retention policy is enabled for the service, then this operation soft deletes the
files/empty directories and retains the files or snapshots for specified number of days.
After specified number of days, files' data is removed from the service during garbage collection.
Soft deleted files/empty directories are accessible through :func:`list_deleted_paths()`.
:param files:
The files/empty directories to delete. This can be a single file/empty directory, or multiple values can
be supplied, where each value is either the name of the file/directory (str) or
FileProperties/DirectoryProperties.
.. note::
When the file/dir type is dict, here's a list of keys, value rules.
blob name:
key: 'name', value type: str
if the file modified or not:
key: 'if_modified_since', 'if_unmodified_since', value type: datetime
etag:
key: 'etag', value type: str
match the etag or not:
key: 'match_condition', value type: MatchConditions
lease:
key: 'lease_id', value type: Union[str, LeaseClient]
timeout for subrequest:
key: 'timeout', value type: int
:type files: list[str], list[dict],
or list[Union[~azure.storage.filedatalake.FileProperties, ~azure.storage.filedatalake.DirectoryProperties]
:keyword ~datetime.datetime if_modified_since:
A DateTime value. Azure expects the date value passed in to be UTC.
If timezone is included, any non-UTC datetimes will be converted to UTC.
If a date is passed in without timezone info, it is assumed to be UTC.
Specify this header to perform the operation only
if the resource has been modified since the specified time.
:keyword ~datetime.datetime if_unmodified_since:
A DateTime value. Azure expects the date value passed in to be UTC.
If timezone is included, any non-UTC datetimes will be converted to UTC.
If a date is passed in without timezone info, it is assumed to be UTC.
Specify this header to perform the operation only if
the resource has not been modified since the specified date/time.
:keyword bool raise_on_any_failure:
This is a boolean param which defaults to True. When this is set, an exception
is raised even if there is a single operation failure.
:keyword int timeout:
The timeout parameter is expressed in seconds.
:return: An iterator of responses, one for each blob in order
:rtype: AsyncIterator[~azure.core.pipeline.transport.AsyncHttpResponse]
.. admonition:: Example:
.. literalinclude:: ../samples/datalake_samples_file_system_async.py
:start-after: [START batch_delete_files_or_empty_directories]
:end-before: [END batch_delete_files_or_empty_directories]
:language: python
:dedent: 4
:caption: Deleting multiple files or empty directories.
"""
return await self._container_client.delete_blobs(*files, **kwargs)

@distributed_trace_async
async def _undelete_path(self, deleted_path_name, deletion_id, **kwargs):
# type: (str, str, **Any) -> Union[DataLakeDirectoryClient, DataLakeFileClient]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,55 @@ def create_file_from_file_system(self):

file_system_client.delete_file_system()

# [START batch_delete_files_or_empty_directories]
def batch_delete_files_or_empty_directories(self):
from azure.storage.filedatalake import FileSystemClient
file_system_client = FileSystemClient.from_connection_string(self.connection_string, "filesystem")

file_system_client.create_file_system()

data = b'hello world'

try:
# create file1
file_system_client.get_file_client('file1').upload_data(data, overwrite=True)

# create file2, then pass file properties in batch delete later
file2 = file_system_client.get_file_client('file2')
file2.upload_data(data, overwrite=True)
file2_properties = file2.get_file_properties()

# create file3 and batch delete it later only etag matches this file3 etag
file3 = file_system_client.get_file_client('file3')
file3.upload_data(data, overwrite=True)
file3_etag = file3.get_file_properties().etag

# create dir1. Empty directory can be deleted using delete_files
file_system_client.get_directory_client('dir1').create_directory(),

# create dir2, then pass directory properties in batch delete later
dir2 = file_system_client.get_directory_client('dir2')
dir2.create_directory()
dir2_properties = dir2.get_directory_properties()
except:
pass

# Act
response = file_system_client.delete_files(
'file1',
file2_properties,
{'name': 'file3', 'etag': file3_etag},
'dir1',
dir2_properties,
raise_on_any_failure=False
)
print("total number of sub-responses:" + len(response))
print(response[0].status_code)
print(response[2].status_code)
print(response[3].status_code)
# [END batch_delete_files_or_empty_directories]


if __name__ == '__main__':
sample = FileSystemSamples()
sample.file_system_sample()
Expand All @@ -217,3 +266,4 @@ def create_file_from_file_system(self):
sample.list_paths_in_file_system()
sample.get_file_client_from_file_system()
sample.create_file_from_file_system()
sample.batch_delete_files_or_empty_directories()
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,64 @@ async def create_file_from_file_system(self):

await file_system_client.delete_file_system()

# [START batch_delete_files_or_empty_directories]
async def batch_delete_files_or_empty_directories(self):
from azure.storage.filedatalake.aio import FileSystemClient
file_system_client = FileSystemClient.from_connection_string(self.connection_string, "filesystemforcreate")

async with file_system_client:
await file_system_client.create_file_system()

data = b'hello world'

try:
# create file1
await file_system_client.get_file_client('file1').upload_data(data, overwrite=True)

# create file2, then pass file properties in batch delete later
file2 = file_system_client.get_file_client('file2')
await file2.upload_data(data, overwrite=True)
file2_properties = await file2.get_file_properties()

# create file3 and batch delete it later only etag matches this file3 etag
file3 = file_system_client.get_file_client('file3')
await file3.upload_data(data, overwrite=True)
file3_props = await file3.get_file_properties()
file3_etag = file3_props.etag

# create dir1
# empty directory can be deleted using delete_files
await file_system_client.get_directory_client('dir1').create_directory(),

# create dir2, then pass directory properties in batch delete later
dir2 = file_system_client.get_directory_client('dir2')
await dir2.create_directory()
dir2_properties = await dir2.get_directory_properties()

except:
pass

# Act
response = await self._to_list(await file_system_client.delete_files(
'file1',
file2_properties,
{'name': 'file3', 'etag': file3_etag},
'dir1',
dir2_properties,
raise_on_any_failure=False
))
print("total number of sub-responses:" + len(response))
print(response[0].status_code)
print(response[2].status_code)
print(response[3].status_code)

async def _to_list(self, async_iterator):
result = []
async for item in async_iterator:
result.append(item)
return result
# [END batch_delete_files_or_empty_directories]


async def run():
sample = FileSystemSamplesAsync()
Expand All @@ -226,6 +284,7 @@ async def run():
await sample.list_paths_in_file_system()
await sample.get_file_client_from_file_system()
await sample.create_file_from_file_system()
await sample.batch_delete_files_or_empty_directories()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
Expand Down
Loading

0 comments on commit bfd3e81

Please sign in to comment.