Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fine-grained file cache control options #314

Merged
merged 18 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# cloudpathlib Changelog

## v0.13.0 (UNRLEASED)

- Implement `file_cache_mode`s to give users finer-grained control over when and how the cache is cleared. ([Issue #10](https://github.com/drivendataorg/cloudpathlib/issues/10), [PR #314](https://github.com/drivendataorg/cloudpathlib/pull/314))

## v0.12.1 (2023-01-04)

- Fix glob logic for buckets; add regression test; add error on globbing all buckets ([Issue #311](https://github.com/drivendataorg/cloudpathlib/issues/311), [PR #312](https://github.com/drivendataorg/cloudpathlib/pull/312))
Expand Down
11 changes: 10 additions & 1 deletion cloudpathlib/azure/azblobclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from ..client import Client, register_client_class
from ..cloudpath import implementation_registry
from ..enums import FileCacheMode
from ..exceptions import MissingCredentialsError
from .azblobpath import AzureBlobPath

Expand All @@ -32,6 +33,7 @@ def __init__(
credential: Optional[Any] = None,
connection_string: Optional[str] = None,
blob_service_client: Optional["BlobServiceClient"] = None,
file_cache_mode: Optional[Union[str, FileCacheMode]] = None,
pjbull marked this conversation as resolved.
Show resolved Hide resolved
local_cache_dir: Optional[Union[str, os.PathLike]] = None,
content_type_method: Optional[Callable] = mimetypes.guess_type,
):
Expand Down Expand Up @@ -68,12 +70,19 @@ def __init__(
https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python#copy-your-credentials-from-the-azure-portal).
blob_service_client (Optional[BlobServiceClient]): Instantiated [`BlobServiceClient`](
https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobserviceclient?view=azure-python).
file_cache_mode (Optional[Union[str, FileCacheMode]]): How often to clear the file cache; see
[the caching docs](https://cloudpathlib.drivendata.org/stable/caching/) for more information
about the options in cloudpathlib.eums.FileCacheMode.
local_cache_dir (Optional[Union[str, os.PathLike]]): Path to directory to use as cache
for downloaded files. If None, will use a temporary directory.
content_type_method (Optional[Callable]): Function to call to guess media type (mimetype) when
writing a file to the cloud. Defaults to `mimetypes.guess_type`. Must return a tuple (content type, content encoding).
"""
super().__init__(local_cache_dir=local_cache_dir, content_type_method=content_type_method)
super().__init__(
local_cache_dir=local_cache_dir,
content_type_method=content_type_method,
file_cache_mode=file_cache_mode,
)

if connection_string is None:
connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING", None)
Expand Down
61 changes: 57 additions & 4 deletions cloudpathlib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import mimetypes
import os
from pathlib import Path
import shutil
from tempfile import TemporaryDirectory
from typing import Generic, Callable, Iterable, Optional, Tuple, TypeVar, Union

from .cloudpath import CloudImplementation, CloudPath, implementation_registry
from .enums import FileCacheMode
from .exceptions import InvalidConfigurationException


BoundedCloudPath = TypeVar("BoundedCloudPath", bound=CloudPath)

Expand All @@ -28,23 +32,61 @@ class Client(abc.ABC, Generic[BoundedCloudPath]):

def __init__(
self,
file_cache_mode: Optional[Union[str, FileCacheMode]] = None,
local_cache_dir: Optional[Union[str, os.PathLike]] = None,
content_type_method: Optional[Callable] = mimetypes.guess_type,
):
self.file_cache_mode = None
self._cache_tmp_dir = None
self._cloud_meta.validate_completeness()
# setup caching and local versions of file and track if it is a tmp dir

# convert strings passed to enum
if isinstance(file_cache_mode, str):
file_cache_mode = FileCacheMode(file_cache_mode)

# if not explcitly passed to client, get from env var
if file_cache_mode is None:
file_cache_mode = FileCacheMode.from_environment()

# explicitly passing a cache dir, so we set to persistent
# unless user explicitly passes a different file cache mode
if local_cache_dir and file_cache_mode is None:
file_cache_mode = FileCacheMode.persistent

if file_cache_mode == FileCacheMode.persistent and local_cache_dir is None:
raise InvalidConfigurationException(
f"If you use the '{FileCacheMode.persistent}' cache mode, you must pass a `local_cache_dir` when you instantiate the client."
)

# if no explicit local dir, setup caching in temporary dir
if local_cache_dir is None:
self._cache_tmp_dir = TemporaryDirectory()
local_cache_dir = self._cache_tmp_dir.name

if file_cache_mode is None:
file_cache_mode = FileCacheMode.tmp_dir

self._local_cache_dir = Path(local_cache_dir)
self.content_type_method = content_type_method

# Fallback: if not set anywhere, default to tmp_dir (for backwards compatibility)
if file_cache_mode is None:
file_cache_mode = FileCacheMode.tmp_dir

self.file_cache_mode = file_cache_mode

def __del__(self) -> None:
# make sure temp is cleaned up if we created it
if self._cache_tmp_dir is not None:
self._cache_tmp_dir.cleanup()
# remove containing dir, even if a more aggressive strategy
# removed the actual files
if self.file_cache_mode in [
FileCacheMode.tmp_dir,
FileCacheMode.close_file,
FileCacheMode.cloudpath_object,
]:
self.clear_cache()

if self._local_cache_dir.exists():
self._local_cache_dir.rmdir()

@classmethod
def get_default_client(cls) -> "Client":
Expand All @@ -63,6 +105,17 @@ def set_as_default_client(self) -> None:
def CloudPath(self, cloud_path: Union[str, BoundedCloudPath]) -> BoundedCloudPath:
return self._cloud_meta.path_class(cloud_path=cloud_path, client=self) # type: ignore

def clear_cache(self):
"""Clears the contents of the cache folder.
Does not remove folder so it can keep being written to.
"""
if self._local_cache_dir.exists():
for p in self._local_cache_dir.iterdir():
if p.is_file():
p.unlink()
else:
shutil.rmtree(p)

@abc.abstractmethod
def _download_file(
self, cloud_path: BoundedCloudPath, local_path: Union[str, os.PathLike]
Expand Down
61 changes: 51 additions & 10 deletions cloudpathlib/cloudpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
_posix_flavour,
_PathParents,
)
import shutil
from typing import (
overload,
Any,
Expand All @@ -33,6 +34,8 @@
from urllib.parse import urlparse
from warnings import warn

from cloudpathlib.enums import FileCacheMode

from . import anypath

from .exceptions import (
Expand Down Expand Up @@ -209,6 +212,13 @@ def __del__(self) -> None:
if self._handle is not None:
self._handle.close()

# ensure file removed from cache when cloudpath object deleted
if (
hasattr(self, "client")
and self.client.file_cache_mode == FileCacheMode.cloudpath_object
):
self.clear_cache()

def __getstate__(self) -> Dict[str, Any]:
state = self.__dict__.copy()

Expand Down Expand Up @@ -469,12 +479,17 @@ def open(
# write modes need special on closing the buffer
if any(m in mode for m in ("w", "+", "x", "a")):
# dirty, handle, patch close
original_close = buffer.close
wrapped_close = buffer.close

# since we are pretending this is a cloud file, upload it to the cloud
# when the buffer is closed
def _patched_close(*args, **kwargs) -> None:
original_close(*args, **kwargs)
def _patched_close_upload(*args, **kwargs) -> None:
wrapped_close(*args, **kwargs)

# we should be idempotent and not upload again if
# we already ran our close method patch
if not self._dirty:
return

# original mtime should match what was in the cloud; because of system clocks or rounding
# by the cloud provider, the new version in our cache is "older" than the original version;
Expand All @@ -484,15 +499,31 @@ def _patched_close(*args, **kwargs) -> None:
os.utime(self._local, times=(new_mtime, new_mtime))

self._upload_local_to_cloud(force_overwrite_to_cloud=force_overwrite_to_cloud)
self._dirty = False

buffer.close = _patched_close # type: ignore
buffer.close = _patched_close_upload # type: ignore

# keep reference in case we need to close when __del__ is called on this object
self._handle = buffer

# opened for write, so mark dirty
self._dirty = True

# if we don't want any cache around, remove the cache
# as soon as the file is closed
if self.client.file_cache_mode == FileCacheMode.close_file:
# this may be _patched_close_upload, in which case we need to
# make sure to call that first so the file gets uploaded
wrapped_close_for_cache = buffer.close

def _patched_close_empty_cache(*args, **kwargs):
wrapped_close_for_cache(*args, **kwargs)

# remove local file as last step on closing
self.clear_cache()

buffer.close = _patched_close_empty_cache # type: ignore

return buffer

def replace(self: DerivedCloudPath, target: DerivedCloudPath) -> DerivedCloudPath:
Expand Down Expand Up @@ -574,6 +605,14 @@ def write_text(
with self.open(mode="w", encoding=encoding, errors=errors) as f:
return f.write(data)

def read_bytes(self) -> bytes:
with self.open(mode="rb") as f:
return f.read()

def read_text(self, encoding: Optional[str] = None, errors: Optional[str] = None) -> str:
with self.open(mode="r", encoding=encoding, errors=errors) as f:
return f.read()

# ====================== DISPATCHED TO POSIXPATH FOR PURE PATHS ======================
# Methods that are dispatched to exactly how pathlib.PurePosixPath would calculate it on
# self._path for pure paths (does not matter if file exists);
Expand Down Expand Up @@ -726,12 +765,6 @@ def stat(self) -> os.stat_result:
)
return self._dispatch_to_local_cache_path("stat")

def read_bytes(self) -> bytes:
return self._dispatch_to_local_cache_path("read_bytes")

def read_text(self, encoding: Optional[str] = None, errors: Optional[str] = None) -> str:
return self._dispatch_to_local_cache_path("read_text", encoding, errors)

# =========== public cloud methods, not in pathlib ===============
def download_to(self, destination: Union[str, os.PathLike]) -> Path:
destination = Path(destination)
Expand Down Expand Up @@ -917,6 +950,14 @@ def copytree(self, destination, force_overwrite_to_cloud=False, ignore=None):

return destination

def clear_cache(self):
"""Removes cache if it exists"""
if self._local.exists():
if self._local.is_file():
self._local.unlink()
else:
shutil.rmtree(self._local)

# =========== private cloud methods ===============
@property
def _local(self) -> Path:
Expand Down
43 changes: 43 additions & 0 deletions cloudpathlib/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from enum import Enum
import os
from typing import Optional


class FileCacheMode(str, Enum):
"""Enumeration of the modes available for for the cloudpathlib file cache.

Attributes:
persistent (str): Cache is not removed by `cloudpathlib`.
tmp_dir (str): Cache is stored in a
[`TemporaryDirectory`](https://docs.python.org/3/library/tempfile.html#tempfile.TemporaryDirectory)
which is removed when the Client object is garbage collected (or by the OS at some point if not).
cloudpath_object (str): Cache for a `CloudPath` object is removed when `__del__` for that object is
called by Python garbage collection.
close_file (str): Cache for a `CloudPath` file is removed as soon as the file is closed. Note: you must
use `CloudPath.open` whenever opening the file for this method to function.

Modes can be set by passing them to the Client or by setting the `CLOUPATHLIB_FILE_CACHE_MODE`
environment variable.

For more detail, see the [caching documentation page](../../caching).
"""

persistent = "persistent" # cache stays as long as dir on OS does
tmp_dir = "tmp_dir" # DEFAULT: handled by deleting client, Python, or OS (usually on machine restart)
cloudpath_object = "cloudpath_object" # __del__ called on the CloudPath object
close_file = "close_file" # cache is cleared when file is closed
pjbull marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def from_environment(cls) -> Optional["FileCacheMode"]:
"""Parses the environment variable `CLOUPATHLIB_FILE_CACHE_MODE` into
an instance of this Enum.

Returns:
FileCacheMode enum value if the env var is defined, else None.
"""
env_string = os.environ.get("CLOUPATHLIB_FILE_CACHE_MODE", "").lower()

if not env_string:
return None
else:
return cls(env_string)
4 changes: 4 additions & 0 deletions cloudpathlib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class InvalidPrefixError(CloudPathException, ValueError):
pass


class InvalidConfigurationException(CloudPathException, ValueError):
pass


class MissingCredentialsError(CloudPathException):
pass

Expand Down
12 changes: 10 additions & 2 deletions cloudpathlib/gs/gsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from pathlib import Path, PurePosixPath
from typing import Any, Callable, Dict, Iterable, Optional, TYPE_CHECKING, Tuple, Union


from ..client import Client, register_client_class
from ..cloudpath import implementation_registry
from ..enums import FileCacheMode
from .gspath import GSPath

try:
Expand Down Expand Up @@ -36,6 +36,7 @@ def __init__(
credentials: Optional["Credentials"] = None,
project: Optional[str] = None,
storage_client: Optional["StorageClient"] = None,
file_cache_mode: Optional[Union[str, FileCacheMode]] = None,
local_cache_dir: Optional[Union[str, os.PathLike]] = None,
content_type_method: Optional[Callable] = mimetypes.guess_type,
):
Expand Down Expand Up @@ -67,6 +68,9 @@ def __init__(
https://googleapis.dev/python/storage/latest/client.html).
storage_client (Optional[StorageClient]): Instantiated [`StorageClient`](
https://googleapis.dev/python/storage/latest/client.html).
file_cache_mode (Optional[Union[str, FileCacheMode]]): How often to clear the file cache; see
[the caching docs](https://cloudpathlib.drivendata.org/stable/caching/) for more information
about the options in cloudpathlib.eums.FileCacheMode.
local_cache_dir (Optional[Union[str, os.PathLike]]): Path to directory to use as cache
for downloaded files. If None, will use a temporary directory.
content_type_method (Optional[Callable]): Function to call to guess media type (mimetype) when
Expand All @@ -87,7 +91,11 @@ def __init__(
except DefaultCredentialsError:
self.client = StorageClient.create_anonymous_client()

super().__init__(local_cache_dir=local_cache_dir, content_type_method=content_type_method)
super().__init__(
local_cache_dir=local_cache_dir,
content_type_method=content_type_method,
file_cache_mode=file_cache_mode,
)

def _get_metadata(self, cloud_path: GSPath) -> Optional[Dict[str, Any]]:
bucket = self.client.bucket(cloud_path.bucket)
Expand Down
Loading