Skip to content

Commit

Permalink
Merge pull request #11 from Parquery/snaji/update-gswrap-client
Browse files Browse the repository at this point in the history
updated gswrap
  • Loading branch information
slimeth authored Jan 21, 2019
2 parents b6e4b7d + 5f5670a commit eb8026a
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 57 deletions.
9 changes: 2 additions & 7 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,19 @@ created. This one uses internally the `Storage Client
<https://googleapis.github.io/google-cloud-python/latest/storage/client.html#google.cloud.storage.client.Client/>`_
from ``google-cloud-storage``.

Two parameters can be passed to the client:
One parameter can be passed to the client:

The Google Cloud Storage **project** which the client acts on behalf of. It will
be passed when creating the internal client. If not passed, falls back to the
default inferred from the locally authenticated `Google Cloud SDK
<http://cloud.google.com/sdk>`_ environment. Each project needs a separate
client. Operations between two different projects are not supported.

The Google Cloud Storage **bucket_name** which the client acts on behalf of.
It can be either passed when creating the client or will be retrieved from the
first command accessing the project. The library automatically changes the
bucket when trying to access another bucket in the same project.

.. code-block:: python
import gswrap
client = gswrap.Client() # project and bucket_name are optional
client = gswrap.Client() # project is optional
List objects in your bucket
---------------------------
Expand Down
26 changes: 13 additions & 13 deletions benchmark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def benchmark_list_many_files(self) -> None:
file.write_text("text")
try:
_setup(url=self.url_prefix, path=tmp_dir.path)
client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
time_gswrap = timer(
client.ls, url=self.url_prefix, recursive=True)

Expand All @@ -205,7 +205,7 @@ def benchmark_upload_many_files(self) -> None:
file = tmp_dir.path / "file{}".format(index)
file.write_text("text")
try:
client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
time_gswrap = timer(
_gswrap_cp,
src=tmp_dir.path,
Expand Down Expand Up @@ -234,7 +234,7 @@ def benchmark_upload_many_single_files(self) -> None:
file = tmp_dir.path / "file{}".format(index)
file.write_text("text")
try:
client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
time_gswrap = 0.0
for file in tmp_dir.path.iterdir():
time_gswrap += timer(
Expand Down Expand Up @@ -268,7 +268,7 @@ def benchmark_upload_big_files(self) -> None:
file = tmp_dir.path / "file{}".format(index)
file.write_text("a" * size)
try:
client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
time_gswrap = timer(
_gswrap_cp,
src=tmp_dir.path,
Expand Down Expand Up @@ -297,7 +297,7 @@ def benchmark_upload_many_to_many(self) -> None:
file.parent.mkdir(parents=True, exist_ok=True)
file.write_text("text")
try:
client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
srcs_dsts = _upload_many_to_many_local_ls(
src=tmp_dir.path.as_posix(),
dst=self.url_prefix + '/gswrap')
Expand Down Expand Up @@ -338,7 +338,7 @@ def benchmark_download_many_files(self) -> None:

gswrap_dir = tmp_dir.path / "gswrap"
gswrap_dir.mkdir()
client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
time_gswrap = timer(
_gswrap_cp,
client=client,
Expand All @@ -365,7 +365,7 @@ def benchmark_download_many_to_many(self) -> None:

gswrap_dir = tmp_dir.path / "gswrap"
gswrap_dir.mkdir()
client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
srcs_dsts = _gswrap_list_for_cp_many_to_many(
client=client,
src=self.url_prefix,
Expand Down Expand Up @@ -411,7 +411,7 @@ def benchmark_copy_many_files_on_remote(self) -> None:
src=self.url_prefix,
dst=copy_url + "/gsutil")

client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
time_gswrap = timer(
_gswrap_cp,
client=client,
Expand Down Expand Up @@ -439,7 +439,7 @@ def benchmark_copy_many_to_many_on_remote(self) -> None:
try:
_setup(url=self.url_prefix, path=tmp_dir.path)

client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
srcs_dsts = _gswrap_list_for_cp_many_to_many(
client=client,
src=self.url_prefix,
Expand Down Expand Up @@ -483,7 +483,7 @@ def benchmark_rm_many(self) -> None:

_setup(url=self.url_prefix, path=tmp_dir.path)

client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
time_gswrap = timer(
client.rm,
url=self.url_prefix,
Expand Down Expand Up @@ -511,7 +511,7 @@ def benchmark_read_many(self) -> None:
for url in urls:
time_gsutilwrap += timer(gsutilwrap.read_text, url=url)

client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
time_gswrap = 0.0
urls = client.ls(url=self.url_prefix, recursive=True)
for url in urls:
Expand All @@ -536,7 +536,7 @@ def benchmark_write_many(self) -> None:
text="hello",
quiet=True)

client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
time_gswrap = 0.0
for index in range(testcase):
time_gswrap += timer(
Expand Down Expand Up @@ -567,7 +567,7 @@ def benchmark_stat_many(self) -> None:
for url in urls:
time_gsutilwrap += timer(gsutilwrap.stat, url=url)

client = gswrap.Client(bucket_name=self.bucket)
client = gswrap.Client()
time_gswrap = 0.0
urls = client.ls(url=self.url_prefix, recursive=True)
for url in urls:
Expand Down
49 changes: 23 additions & 26 deletions gswrap/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
"""Wrap gCloud Storage API for simpler and multithreaded handling of objects."""
"""Wrap Google Cloud Storage API for multi-threaded data manipulation."""

# pylint: disable=protected-access
# pylint: disable=too-many-lines
Expand Down Expand Up @@ -328,27 +328,24 @@ def _download_to_path(blob: google.cloud.storage.blob.Blob,
class Client:
"""Google Cloud Storage Client for simple usage of gsutil commands."""

def __init__(self,
bucket_name: Optional[str] = None,
project: Optional[str] = None) -> None:
def __init__(self, project: Optional[str] = None) -> None:
"""
Initialize.
:param bucket_name: name of the active bucket
:param project:
the project which the client acts on behalf of. Will
be passed when creating a topic. If None, falls back to the
default inferred from the environment.
The Google Cloud Storage project which the client acts on behalf of.
It will be passed when creating the internal client. If not passed,
falls back to the default inferred from the locally authenticated
Google Cloud SDK (http://cloud.google.com/sdk) environment. Each
project needs a separate client. Operations between two different
projects are not supported.
"""
if project is not None:
self._client = google.cloud.storage.Client(project=project)
else:
self._client = google.cloud.storage.Client()

if bucket_name:
self._bucket = self._client.get_bucket(bucket_name=bucket_name)
else:
self._bucket = None
self._bucket = None # type: google.cloud.storage.Bucket

def _change_bucket(self, bucket_name: str) -> None:
"""
Expand Down Expand Up @@ -508,7 +505,7 @@ def cp(self,
When specified, existing files or objects at the destination will
not be overwritten.
:param multithreaded:
if set to False the copy will be performed non-multithreaded.
if set to False the copy will be performed single-threaded.
If set to True it will use multiple threads to perform the copy.
:param preserve_posix:
(from https://cloud.google.com/storage/docs/gsutil/commands/cp)
Expand Down Expand Up @@ -589,10 +586,10 @@ def _cp(self,
:param recursive: if true also copy files within folders
:param no_clobber: if true don't overwrite files which already exist
:param multithreaded:
if set to False the copy will be performed non-multithreaded.
if set to False the copy will be performed single-threaded.
If set to True it will use multiple threads to perform the copy.
"""
# None is ThreadPoolExecutor max_workers default. 1 is non-multithreaded
# None is ThreadPoolExecutor max_workers default. 1 is single-threaded
max_workers = None if multithreaded else 1

futures = [] # type: List[concurrent.futures.Future]
Expand Down Expand Up @@ -666,7 +663,7 @@ def _upload(self,
:param recursive: if true also upload files within folders
:param no_clobber: if true don't overwrite files which already exist
:param multithreaded:
if set to False the upload will be performed non-multithreaded.
if set to False the upload will be performed single-threaded.
If set to True it will use multiple threads to perform the upload.
"""
upload_files = [] # type: List[str]
Expand Down Expand Up @@ -694,7 +691,7 @@ def _upload(self,
self._change_bucket(bucket_name=dst.bucket)
bucket = self._bucket

# None is ThreadPoolExecutor max_workers default. 1 is non-multithreaded
# None is ThreadPoolExecutor max_workers default. 1 is single-threaded
max_workers = None if multithreaded else 1
futures = [] # type: List[concurrent.futures.Future]
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) \
Expand Down Expand Up @@ -756,7 +753,7 @@ def _download(self,
:param recursive: if True also download files within folders
:param no_clobber: if True don't overwrite files which already exist
:param multithreaded:
if set to False the download will be performed non-multithreaded.
if set to False the download will be performed single-threaded.
If set to True it will use multiple threads to perform the download.
"""
src_prefix_parent = pathlib.Path(src.prefix)
Expand Down Expand Up @@ -786,7 +783,7 @@ def _download(self,
blob_iterator = bucket.list_blobs(
prefix=src_prefix, delimiter=delimiter)

# None is ThreadPoolExecutor max_workers default. 1 is non-multithreaded
# None is ThreadPoolExecutor max_workers default. 1 is single-threaded
max_workers = None if multithreaded else 1
futures = [] # type: List[concurrent.futures.Future]
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) \
Expand Down Expand Up @@ -845,7 +842,7 @@ def cp_many_to_many(
When specified, existing files or objects at the destination will
not be overwritten.
:param multithreaded:
if set to False the copy will be performed non-multithreaded.
if set to False the copy will be performed single-threaded.
If set to True it will use multiple threads to perform the copy.
:param preserve_posix:
(from https://cloud.google.com/storage/docs/gsutil/commands/cp)
Expand All @@ -856,7 +853,7 @@ def cp_many_to_many(
access/modification time of the file. POSIX attributes are always
preserved when blob is copied on google cloud storage.
"""
# None is ThreadPoolExecutor max_workers default. 1 is non-multithreaded
# None is ThreadPoolExecutor max_workers default. 1 is single-threaded
max_workers = None if multithreaded else 1
futures = [] # type: List[concurrent.futures.Future]
with concurrent.futures.ThreadPoolExecutor(
Expand Down Expand Up @@ -887,7 +884,7 @@ def rm(self, url: str, recursive: bool = False,
:param url: google cloud storage URL
:param recursive: if True remove files within folders
:param multithreaded:
if set to False the remove will be performed non-multithreaded.
if set to False the remove will be performed single-threaded.
If set to True it will use multiple threads to perform the remove.
"""
rm_url = resource_type(res_loc=url)
Expand Down Expand Up @@ -923,7 +920,7 @@ def rm(self, url: str, recursive: bool = False,
prefix=gcs_url_prefix, delimiter=delimiter)

# None is ThreadPoolExecutor max_workers default.
# 1 is non-multithreaded
# 1 is single-threaded
max_workers = None if multithreaded else 1
futures = [] # type: List[concurrent.futures.Future]
with concurrent.futures.ThreadPoolExecutor(
Expand Down Expand Up @@ -1008,7 +1005,7 @@ def write_text(self, url: str, text: str, encoding: str = 'utf-8'):
@icontract.require(lambda url: not contains_wildcard(prefix=url))
def stat(self, url: str) -> Optional[Stat]:
"""
Retrieve that stat of the object in the Google Cloud Storage.
Retrieve the stat of the object in the Google Cloud Storage.
:param url: to the object
:return: object status,
Expand Down Expand Up @@ -1121,14 +1118,14 @@ def md5_hexdigests(self, urls: List[str], multithreaded: bool = False) \
:param urls: URLs to stat and retrieve MD5 of
:param multithreaded:
if set to False the retrieving hex digests of md5 checksums will
be performed non-multithreaded.
be performed single-threaded.
If set to True it will use multiple threads to perform the this.
:return: list of hexdigests;
if an URL does not exist, the corresponding item is None.
"""
hexdigests = [] # type: List[Optional[str]]

# None is ThreadPoolExecutor max_workers default. 1 is non-multithreaded
# None is ThreadPoolExecutor max_workers default. 1 is single-threaded
max_workers = None if multithreaded else 1
stat_futures = [] # type: List[concurrent.futures.Future]
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) \
Expand Down
6 changes: 4 additions & 2 deletions tests/test_live_cp_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

class TestCPDownload(unittest.TestCase):
def setUp(self):
self.client = gswrap.Client(bucket_name=tests.common.TEST_GCS_BUCKET)
self.client = gswrap.Client()
self.client._change_bucket(tests.common.TEST_GCS_BUCKET)
self.bucket_prefix = str(uuid.uuid4())
self.tmp_dir = tempfile.TemporaryDirectory()
tests.common.gcs_test_setup(
Expand Down Expand Up @@ -291,7 +292,8 @@ def test_download_clobber(self):

class TestCPDownloadNoCommonSetup(unittest.TestCase):
def setUp(self):
self.client = gswrap.Client(bucket_name=tests.common.TEST_GCS_BUCKET)
self.client = gswrap.Client()
self.client._change_bucket(tests.common.TEST_GCS_BUCKET)
self.bucket_prefix = str(uuid.uuid4())

def tearDown(self):
Expand Down
3 changes: 2 additions & 1 deletion tests/test_live_cp_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

class TestCPLocal(unittest.TestCase):
def setUp(self):
self.client = gswrap.Client(bucket_name=tests.common.TEST_GCS_BUCKET)
self.client = gswrap.Client()
self.client._change_bucket(tests.common.TEST_GCS_BUCKET)
self.tmp_dir = tempfile.TemporaryDirectory()
self.local_dir = pathlib.Path(self.tmp_dir.name) / str(uuid.uuid4())
self.local_dir.mkdir()
Expand Down
3 changes: 2 additions & 1 deletion tests/test_live_cp_many_to_many.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

class TestCPManyToMany(unittest.TestCase):
def setUp(self):
self.client = gswrap.Client(bucket_name=tests.common.TEST_GCS_BUCKET)
self.client = gswrap.Client()
self.client._change_bucket(tests.common.TEST_GCS_BUCKET)
self.bucket_prefix = str(uuid.uuid4())
self.tmp_dir = tempfile.TemporaryDirectory()
tests.common.gcs_test_setup(
Expand Down
3 changes: 2 additions & 1 deletion tests/test_live_cp_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

class TestCPRemote(unittest.TestCase):
def setUp(self):
self.client = gswrap.Client(bucket_name=tests.common.TEST_GCS_BUCKET)
self.client = gswrap.Client()
self.client._change_bucket(tests.common.TEST_GCS_BUCKET)
self.bucket_prefix = str(uuid.uuid4())
self.tmp_dir = tempfile.TemporaryDirectory()
tests.common.gcs_test_setup(
Expand Down
6 changes: 4 additions & 2 deletions tests/test_live_cp_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

class TestCPUpload(unittest.TestCase):
def setUp(self):
self.client = gswrap.Client(bucket_name=tests.common.TEST_GCS_BUCKET)
self.client = gswrap.Client()
self.client._change_bucket(tests.common.TEST_GCS_BUCKET)
self.bucket_prefix = str(uuid.uuid4())
self.tmp_dir = tempfile.TemporaryDirectory()
tests.common.gcs_test_setup(
Expand Down Expand Up @@ -235,7 +236,8 @@ def test_upload_clobber(self):

class TestCPUploadNoCommonSetup(unittest.TestCase):
def setUp(self):
self.client = gswrap.Client(bucket_name=tests.common.TEST_GCS_BUCKET)
self.client = gswrap.Client()
self.client._change_bucket(tests.common.TEST_GCS_BUCKET)
self.bucket_prefix = str(uuid.uuid4())

def tearDown(self):
Expand Down
4 changes: 3 additions & 1 deletion tests/test_live_ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""Test gs-wrap ls live."""

# pylint: disable=missing-docstring
# pylint: disable=protected-access

import tempfile
import unittest
Expand All @@ -15,7 +16,8 @@

class TestLS(unittest.TestCase):
def setUp(self):
self.client = gswrap.Client(bucket_name=tests.common.TEST_GCS_BUCKET)
self.client = gswrap.Client()
self.client._change_bucket(tests.common.TEST_GCS_BUCKET)
self.bucket_prefix = str(uuid.uuid4())
self.tmp_dir = tempfile.TemporaryDirectory()
tests.common.gcs_test_setup(
Expand Down
Loading

0 comments on commit eb8026a

Please sign in to comment.