Skip to content

Commit

Permalink
Implement fine-grained file cache control options (#314)
Browse files Browse the repository at this point in the history
* Implementation of cache options

* Cache tests

* Cache mode documentation

* changelog and version bump

* Update tests

* Hide input output for jupyter cells

* cleanup client in any non-persistent cahce mode

* simplify test suite; add missing tests

* stabailize test

* configuration changes

* autoformat notebooks

* Add nbautoexport for docs

* Update docs

* Reorder kwargs

* Update typos in caching docs

* Add feedback to docs

* Update docs/docs/script/caching.py

Co-authored-by: Jay Qi <[email protected]>

* update notebook as well

---------

Co-authored-by: Jay Qi <[email protected]>
  • Loading branch information
pjbull and jayqi authored Feb 8, 2023
1 parent bb9d0a3 commit 088fe49
Show file tree
Hide file tree
Showing 26 changed files with 1,626 additions and 76 deletions.
4 changes: 4 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# cloudpathlib Changelog

## v0.13.0 (UNRLEASED)

- Implement `file_cache_mode`s to give users finer-grained control over when and how the cache is cleared. ([Issue #10](https://github.com/drivendataorg/cloudpathlib/issues/10), [PR #314](https://github.com/drivendataorg/cloudpathlib/pull/314))

## v0.12.1 (2023-01-04)

- Fix glob logic for buckets; add regression test; add error on globbing all buckets ([Issue #311](https://github.com/drivendataorg/cloudpathlib/issues/311), [PR #312](https://github.com/drivendataorg/cloudpathlib/pull/312))
Expand Down
11 changes: 10 additions & 1 deletion cloudpathlib/azure/azblobclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from ..client import Client, register_client_class
from ..cloudpath import implementation_registry
from ..enums import FileCacheMode
from ..exceptions import MissingCredentialsError
from .azblobpath import AzureBlobPath

Expand All @@ -32,6 +33,7 @@ def __init__(
credential: Optional[Any] = None,
connection_string: Optional[str] = None,
blob_service_client: Optional["BlobServiceClient"] = None,
file_cache_mode: Optional[Union[str, FileCacheMode]] = None,
local_cache_dir: Optional[Union[str, os.PathLike]] = None,
content_type_method: Optional[Callable] = mimetypes.guess_type,
):
Expand Down Expand Up @@ -68,12 +70,19 @@ def __init__(
https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python#copy-your-credentials-from-the-azure-portal).
blob_service_client (Optional[BlobServiceClient]): Instantiated [`BlobServiceClient`](
https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobserviceclient?view=azure-python).
file_cache_mode (Optional[Union[str, FileCacheMode]]): How often to clear the file cache; see
[the caching docs](https://cloudpathlib.drivendata.org/stable/caching/) for more information
about the options in cloudpathlib.eums.FileCacheMode.
local_cache_dir (Optional[Union[str, os.PathLike]]): Path to directory to use as cache
for downloaded files. If None, will use a temporary directory.
content_type_method (Optional[Callable]): Function to call to guess media type (mimetype) when
writing a file to the cloud. Defaults to `mimetypes.guess_type`. Must return a tuple (content type, content encoding).
"""
super().__init__(local_cache_dir=local_cache_dir, content_type_method=content_type_method)
super().__init__(
local_cache_dir=local_cache_dir,
content_type_method=content_type_method,
file_cache_mode=file_cache_mode,
)

if connection_string is None:
connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING", None)
Expand Down
61 changes: 57 additions & 4 deletions cloudpathlib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import mimetypes
import os
from pathlib import Path
import shutil
from tempfile import TemporaryDirectory
from typing import Generic, Callable, Iterable, Optional, Tuple, TypeVar, Union

from .cloudpath import CloudImplementation, CloudPath, implementation_registry
from .enums import FileCacheMode
from .exceptions import InvalidConfigurationException


BoundedCloudPath = TypeVar("BoundedCloudPath", bound=CloudPath)

Expand All @@ -28,23 +32,61 @@ class Client(abc.ABC, Generic[BoundedCloudPath]):

def __init__(
self,
file_cache_mode: Optional[Union[str, FileCacheMode]] = None,
local_cache_dir: Optional[Union[str, os.PathLike]] = None,
content_type_method: Optional[Callable] = mimetypes.guess_type,
):
self.file_cache_mode = None
self._cache_tmp_dir = None
self._cloud_meta.validate_completeness()
# setup caching and local versions of file and track if it is a tmp dir

# convert strings passed to enum
if isinstance(file_cache_mode, str):
file_cache_mode = FileCacheMode(file_cache_mode)

# if not explcitly passed to client, get from env var
if file_cache_mode is None:
file_cache_mode = FileCacheMode.from_environment()

# explicitly passing a cache dir, so we set to persistent
# unless user explicitly passes a different file cache mode
if local_cache_dir and file_cache_mode is None:
file_cache_mode = FileCacheMode.persistent

if file_cache_mode == FileCacheMode.persistent and local_cache_dir is None:
raise InvalidConfigurationException(
f"If you use the '{FileCacheMode.persistent}' cache mode, you must pass a `local_cache_dir` when you instantiate the client."
)

# if no explicit local dir, setup caching in temporary dir
if local_cache_dir is None:
self._cache_tmp_dir = TemporaryDirectory()
local_cache_dir = self._cache_tmp_dir.name

if file_cache_mode is None:
file_cache_mode = FileCacheMode.tmp_dir

self._local_cache_dir = Path(local_cache_dir)
self.content_type_method = content_type_method

# Fallback: if not set anywhere, default to tmp_dir (for backwards compatibility)
if file_cache_mode is None:
file_cache_mode = FileCacheMode.tmp_dir

self.file_cache_mode = file_cache_mode

def __del__(self) -> None:
# make sure temp is cleaned up if we created it
if self._cache_tmp_dir is not None:
self._cache_tmp_dir.cleanup()
# remove containing dir, even if a more aggressive strategy
# removed the actual files
if self.file_cache_mode in [
FileCacheMode.tmp_dir,
FileCacheMode.close_file,
FileCacheMode.cloudpath_object,
]:
self.clear_cache()

if self._local_cache_dir.exists():
self._local_cache_dir.rmdir()

@classmethod
def get_default_client(cls) -> "Client":
Expand All @@ -63,6 +105,17 @@ def set_as_default_client(self) -> None:
def CloudPath(self, cloud_path: Union[str, BoundedCloudPath]) -> BoundedCloudPath:
return self._cloud_meta.path_class(cloud_path=cloud_path, client=self) # type: ignore

def clear_cache(self):
"""Clears the contents of the cache folder.
Does not remove folder so it can keep being written to.
"""
if self._local_cache_dir.exists():
for p in self._local_cache_dir.iterdir():
if p.is_file():
p.unlink()
else:
shutil.rmtree(p)

@abc.abstractmethod
def _download_file(
self, cloud_path: BoundedCloudPath, local_path: Union[str, os.PathLike]
Expand Down
61 changes: 51 additions & 10 deletions cloudpathlib/cloudpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
_posix_flavour,
_PathParents,
)
import shutil
from typing import (
overload,
Any,
Expand All @@ -33,6 +34,8 @@
from urllib.parse import urlparse
from warnings import warn

from cloudpathlib.enums import FileCacheMode

from . import anypath

from .exceptions import (
Expand Down Expand Up @@ -209,6 +212,13 @@ def __del__(self) -> None:
if self._handle is not None:
self._handle.close()

# ensure file removed from cache when cloudpath object deleted
if (
hasattr(self, "client")
and self.client.file_cache_mode == FileCacheMode.cloudpath_object
):
self.clear_cache()

def __getstate__(self) -> Dict[str, Any]:
state = self.__dict__.copy()

Expand Down Expand Up @@ -469,12 +479,17 @@ def open(
# write modes need special on closing the buffer
if any(m in mode for m in ("w", "+", "x", "a")):
# dirty, handle, patch close
original_close = buffer.close
wrapped_close = buffer.close

# since we are pretending this is a cloud file, upload it to the cloud
# when the buffer is closed
def _patched_close(*args, **kwargs) -> None:
original_close(*args, **kwargs)
def _patched_close_upload(*args, **kwargs) -> None:
wrapped_close(*args, **kwargs)

# we should be idempotent and not upload again if
# we already ran our close method patch
if not self._dirty:
return

# original mtime should match what was in the cloud; because of system clocks or rounding
# by the cloud provider, the new version in our cache is "older" than the original version;
Expand All @@ -484,15 +499,31 @@ def _patched_close(*args, **kwargs) -> None:
os.utime(self._local, times=(new_mtime, new_mtime))

self._upload_local_to_cloud(force_overwrite_to_cloud=force_overwrite_to_cloud)
self._dirty = False

buffer.close = _patched_close # type: ignore
buffer.close = _patched_close_upload # type: ignore

# keep reference in case we need to close when __del__ is called on this object
self._handle = buffer

# opened for write, so mark dirty
self._dirty = True

# if we don't want any cache around, remove the cache
# as soon as the file is closed
if self.client.file_cache_mode == FileCacheMode.close_file:
# this may be _patched_close_upload, in which case we need to
# make sure to call that first so the file gets uploaded
wrapped_close_for_cache = buffer.close

def _patched_close_empty_cache(*args, **kwargs):
wrapped_close_for_cache(*args, **kwargs)

# remove local file as last step on closing
self.clear_cache()

buffer.close = _patched_close_empty_cache # type: ignore

return buffer

def replace(self: DerivedCloudPath, target: DerivedCloudPath) -> DerivedCloudPath:
Expand Down Expand Up @@ -574,6 +605,14 @@ def write_text(
with self.open(mode="w", encoding=encoding, errors=errors) as f:
return f.write(data)

def read_bytes(self) -> bytes:
with self.open(mode="rb") as f:
return f.read()

def read_text(self, encoding: Optional[str] = None, errors: Optional[str] = None) -> str:
with self.open(mode="r", encoding=encoding, errors=errors) as f:
return f.read()

# ====================== DISPATCHED TO POSIXPATH FOR PURE PATHS ======================
# Methods that are dispatched to exactly how pathlib.PurePosixPath would calculate it on
# self._path for pure paths (does not matter if file exists);
Expand Down Expand Up @@ -726,12 +765,6 @@ def stat(self) -> os.stat_result:
)
return self._dispatch_to_local_cache_path("stat")

def read_bytes(self) -> bytes:
return self._dispatch_to_local_cache_path("read_bytes")

def read_text(self, encoding: Optional[str] = None, errors: Optional[str] = None) -> str:
return self._dispatch_to_local_cache_path("read_text", encoding, errors)

# =========== public cloud methods, not in pathlib ===============
def download_to(self, destination: Union[str, os.PathLike]) -> Path:
destination = Path(destination)
Expand Down Expand Up @@ -917,6 +950,14 @@ def copytree(self, destination, force_overwrite_to_cloud=False, ignore=None):

return destination

def clear_cache(self):
"""Removes cache if it exists"""
if self._local.exists():
if self._local.is_file():
self._local.unlink()
else:
shutil.rmtree(self._local)

# =========== private cloud methods ===============
@property
def _local(self) -> Path:
Expand Down
43 changes: 43 additions & 0 deletions cloudpathlib/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from enum import Enum
import os
from typing import Optional


class FileCacheMode(str, Enum):
"""Enumeration of the modes available for for the cloudpathlib file cache.
Attributes:
persistent (str): Cache is not removed by `cloudpathlib`.
tmp_dir (str): Cache is stored in a
[`TemporaryDirectory`](https://docs.python.org/3/library/tempfile.html#tempfile.TemporaryDirectory)
which is removed when the Client object is garbage collected (or by the OS at some point if not).
cloudpath_object (str): Cache for a `CloudPath` object is removed when `__del__` for that object is
called by Python garbage collection.
close_file (str): Cache for a `CloudPath` file is removed as soon as the file is closed. Note: you must
use `CloudPath.open` whenever opening the file for this method to function.
Modes can be set by passing them to the Client or by setting the `CLOUPATHLIB_FILE_CACHE_MODE`
environment variable.
For more detail, see the [caching documentation page](../../caching).
"""

persistent = "persistent" # cache stays as long as dir on OS does
tmp_dir = "tmp_dir" # DEFAULT: handled by deleting client, Python, or OS (usually on machine restart)
cloudpath_object = "cloudpath_object" # __del__ called on the CloudPath object
close_file = "close_file" # cache is cleared when file is closed

@classmethod
def from_environment(cls) -> Optional["FileCacheMode"]:
"""Parses the environment variable `CLOUPATHLIB_FILE_CACHE_MODE` into
an instance of this Enum.
Returns:
FileCacheMode enum value if the env var is defined, else None.
"""
env_string = os.environ.get("CLOUPATHLIB_FILE_CACHE_MODE", "").lower()

if not env_string:
return None
else:
return cls(env_string)
4 changes: 4 additions & 0 deletions cloudpathlib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class InvalidPrefixError(CloudPathException, ValueError):
pass


class InvalidConfigurationException(CloudPathException, ValueError):
pass


class MissingCredentialsError(CloudPathException):
pass

Expand Down
12 changes: 10 additions & 2 deletions cloudpathlib/gs/gsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from pathlib import Path, PurePosixPath
from typing import Any, Callable, Dict, Iterable, Optional, TYPE_CHECKING, Tuple, Union


from ..client import Client, register_client_class
from ..cloudpath import implementation_registry
from ..enums import FileCacheMode
from .gspath import GSPath

try:
Expand Down Expand Up @@ -36,6 +36,7 @@ def __init__(
credentials: Optional["Credentials"] = None,
project: Optional[str] = None,
storage_client: Optional["StorageClient"] = None,
file_cache_mode: Optional[Union[str, FileCacheMode]] = None,
local_cache_dir: Optional[Union[str, os.PathLike]] = None,
content_type_method: Optional[Callable] = mimetypes.guess_type,
):
Expand Down Expand Up @@ -67,6 +68,9 @@ def __init__(
https://googleapis.dev/python/storage/latest/client.html).
storage_client (Optional[StorageClient]): Instantiated [`StorageClient`](
https://googleapis.dev/python/storage/latest/client.html).
file_cache_mode (Optional[Union[str, FileCacheMode]]): How often to clear the file cache; see
[the caching docs](https://cloudpathlib.drivendata.org/stable/caching/) for more information
about the options in cloudpathlib.eums.FileCacheMode.
local_cache_dir (Optional[Union[str, os.PathLike]]): Path to directory to use as cache
for downloaded files. If None, will use a temporary directory.
content_type_method (Optional[Callable]): Function to call to guess media type (mimetype) when
Expand All @@ -87,7 +91,11 @@ def __init__(
except DefaultCredentialsError:
self.client = StorageClient.create_anonymous_client()

super().__init__(local_cache_dir=local_cache_dir, content_type_method=content_type_method)
super().__init__(
local_cache_dir=local_cache_dir,
content_type_method=content_type_method,
file_cache_mode=file_cache_mode,
)

def _get_metadata(self, cloud_path: GSPath) -> Optional[Dict[str, Any]]:
bucket = self.client.bucket(cloud_path.bucket)
Expand Down
Loading

0 comments on commit 088fe49

Please sign in to comment.