Skip to content

Commit

Permalink
Overhaul fetch transport module to support plug-in transport handlers…
Browse files Browse the repository at this point in the history
…. Addresses #37.

Improve unit test coverage.
  • Loading branch information
mikedarcy committed Oct 19, 2019
1 parent 756a8b5 commit 88ed9e3
Show file tree
Hide file tree
Showing 59 changed files with 1,166 additions and 604 deletions.
2 changes: 1 addition & 1 deletion bdbag/bdbag_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def validate_bag(bag_path, fast=False, callback=None, config_file=DEFAULT_CONFIG
except bdbagit.BaggingInterruptedError as e:
logger.warning(get_typed_exception(e))
raise e
except Exception as e:
except Exception as e: # pragma: no cover
raise RuntimeError("Unhandled exception while validating bag: %s" % e)


Expand Down
16 changes: 13 additions & 3 deletions bdbag/bdbag_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@
"redirect_status_codes": DEFAULT_FETCH_HTTP_REDIRECT_STATUS_CODES,
COOKIE_JAR_TAG: DEFAULT_COOKIE_JAR_SEARCH_CONFIG

},
"https": {
"session_config": {
"retry_connect": 7,
"retry_read": 7,
"retry_backoff_factor": 1.0,
"retry_status_forcelist": [500, 502, 503, 504]
},
"allow_redirects": True,
"redirect_status_codes": DEFAULT_FETCH_HTTP_REDIRECT_STATUS_CODES,
COOKIE_JAR_TAG: DEFAULT_COOKIE_JAR_SEARCH_CONFIG

},
"s3": {
"read_chunk_size": 10 * Megabyte,
Expand All @@ -78,7 +90,6 @@
DEFAULT_RESOLVER_CONFIG = {
"ark": [
{
"prefix": None,
ID_RESOLVER_TAG: DEFAULT_ID_RESOLVERS
},
{
Expand Down Expand Up @@ -148,8 +159,7 @@ def write_config(config=DEFAULT_CONFIG, config_file=DEFAULT_CONFIG_FILE):
cf.write(json.dumps(config if config is not None else DEFAULT_CONFIG, indent=4, sort_keys=True))
cf.close()
except Exception as e:
logger.debug("Unable to create configuration file %s. %s" %
(config_file, get_typed_exception(e)))
logger.warning("Unable to create configuration file %s. %s" % (config_file, get_typed_exception(e)))


def read_config(config_file, create_default=True, auto_upgrade=False):
Expand Down
6 changes: 3 additions & 3 deletions bdbag/bdbagit.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# bagit-python fixes the underlying issue
try:
algorithms_guaranteed = hashlib.algorithms_guaranteed
except AttributeError:
except AttributeError: # pragma: no cover
# Python 2.7.0-2.7.8
hashlib.algorithms_guaranteed = set(hashlib.algorithms)

Expand All @@ -41,7 +41,7 @@
def parse_version(version):
try:
return tuple(int(i) for i in version.split(".", 1))
except ValueError:
except ValueError: # pragma: no cover
raise BagError(
_("Bag version numbers must be MAJOR.MINOR numbers, not %s") % version
)
Expand Down Expand Up @@ -587,7 +587,7 @@ def _validate_entries(self, processes, callback=None):
except BaggingInterruptedError:
raise
# Any unhandled exceptions are probably fatal
except:
except: # pragma: no cover
LOGGER.error(_("Unable to calculate file hashes for %s"), self)
raise

Expand Down
9 changes: 9 additions & 0 deletions bdbag/fetch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@
Kilobyte = 1024
Megabyte = Kilobyte ** 2

SCHEME_HTTP = 'http'
SCHEME_HTTPS = 'https'
SCHEME_S3 = 's3'
SCHEME_GS = 'gs'
SCHEME_GLOBUS = 'globus'
SCHEME_FTP = 'ftp'
SCHEME_SFTP = 'sftp'
SCHEME_TAG = 'tag'


def get_transfer_summary(total_bytes, elapsed_time):
total_secs = elapsed_time.total_seconds()
Expand Down
11 changes: 11 additions & 0 deletions bdbag/fetch/auth/cookies.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,14 @@ def load_and_merge_cookie_jars(cookie_jar_paths):
cookie.expires = None

return cookie_jar


def get_request_cookies(config):
cookie_jars = set()
fetch_config = config.get(FETCH_CONFIG_TAG, DEFAULT_FETCH_CONFIG)
for config in fetch_config.values():
cookie_jar_config = config.get(COOKIE_JAR_TAG, DEFAULT_COOKIE_JAR_SEARCH_CONFIG)
if cookie_jar_config.get(COOKIE_JAR_SEARCH_TAG, True):
cookie_jars.update(set(find_cookie_jars(cookie_jar_config)))
cookies = load_and_merge_cookie_jars(cookie_jars)
return cookies
2 changes: 1 addition & 1 deletion bdbag/fetch/auth/keychain.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def read_keychain(keychain_file=DEFAULT_KEYCHAIN_FILE, create_default=True):
logger.debug("No keychain file specified and no default keychain file found, attempting to create one.")
try:
write_keychain(keychain_file=keychain_file)
except Exception as e:
except Exception as e: # pragma: no cover
logger.warning(
"Unable to create default keychain file. A keychain file is required for authentication when "
"retrieving files from protected remote resources. Either ensure that the default keychain "
Expand Down
125 changes: 57 additions & 68 deletions bdbag/fetch/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import datetime
import logging
from collections import namedtuple
from bdbag import urlsplit, urlunquote, filter_dict
from bdbag.bdbag_config import *
from bdbag.fetch.transports import *
from bdbag.fetch.auth.keychain import *
from bdbag.fetch.auth.cookies import *
from bdbag.bdbag_config import read_config, DEFAULT_CONFIG, DEFAULT_CONFIG_FILE, DEFAULT_KEYCHAIN_FILE, \
FETCH_CONFIG_TAG, DEFAULT_FETCH_CONFIG, RESOLVER_CONFIG_TAG, DEFAULT_RESOLVER_CONFIG
from bdbag.fetch.auth.keychain import read_keychain, DEFAULT_KEYCHAIN_FILE
from bdbag.fetch.auth.cookies import get_request_cookies
from bdbag.fetch.resolvers import resolve
from bdbag.fetch.transports import find_fetcher
from bdbag.fetch.transports.base_transport import BaseFetchTransport

logger = logging.getLogger(__name__)

UNIMPLEMENTED = "Transfer protocol \"%s\" is not supported by this implementation"

SCHEME_HTTP = 'http'
SCHEME_HTTPS = 'https'
SCHEME_S3 = 's3'
SCHEME_GS = 'gs'
SCHEME_GLOBUS = 'globus'
SCHEME_FTP = 'ftp'
SCHEME_SFTP = 'sftp'
SCHEME_TAG = 'tag'
UNIMPLEMENTED = "Transfer protocol \"%s\" is not supported."

FetchEntry = namedtuple("FetchEntry", ["url", "length", "filename"])

Expand All @@ -46,14 +41,15 @@ def fetch_bag_files(bag,
filter_expr=None,
**kwargs):

auth = read_keychain(keychain_file)
keychain = read_keychain(keychain_file)
config = read_config(config_file)
cookies = get_request_cookies(config) if kwargs.get("cookie_scan", True) else None

fetchers = kwargs.get("fetchers") or dict()
success = True
current = 0
total = 0 if not callback else len(set(bag.files_to_be_fetched()))
start = datetime.datetime.now()

for entry in map(FetchEntry._make, bag.fetch_entries()):
filename = urlunquote(entry.filename)
if filter_expr:
Expand All @@ -71,16 +67,15 @@ def fetch_bag_files(bag,
missing = False

if not force and not missing:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Not fetching already present file: %s" % output_path)
pass
logger.debug("Not fetching already present file: %s" % output_path)
else:
result_path = fetch_file(entry.url,
output_path,
auth,
keychain=keychain,
size=entry.length,
config=config,
cookies=cookies,
fetchers=fetchers,
**kwargs)
if not result_path:
success = False
Expand All @@ -93,68 +88,62 @@ def fetch_bag_files(bag,
break
elapsed = datetime.datetime.now() - start
logger.info("Fetch complete. Elapsed time: %s" % elapsed)
cleanup_transports()
cleanup_fetchers(fetchers)
return success


def fetch_file(url, path, auth, **kwargs):

scheme = urlsplit(url).scheme.lower()
if SCHEME_HTTP == scheme or SCHEME_HTTPS == scheme:
return fetch_http.get_file(url, path, auth, **kwargs)
if SCHEME_FTP == scheme:
return fetch_ftp.get_file(url, path, auth, **kwargs)
if SCHEME_S3 == scheme or SCHEME_GS == scheme:
return fetch_boto3.get_file(url, path, auth, **kwargs)
if SCHEME_GLOBUS == scheme:
return fetch_globus.get_file(url, path, auth, **kwargs)
if SCHEME_TAG == scheme: # pragma: no cover
logger.info("The fetch entry for file %s specifies the tag URI %s. Tag URIs may represent objects that "
"cannot be directly resolved as network resources and therefore cannot be automatically fetched. "
"Such files must be acquired outside of the context of this software." % (path, url))
return path

# if we get here, assume the url is an identifier and try to resolve it
config = kwargs.get("config")
resolver_config = config.get(RESOLVER_CONFIG_TAG, DEFAULT_RESOLVER_CONFIG) if config else DEFAULT_RESOLVER_CONFIG
supported_resolvers = resolver_config.keys()
if scheme in supported_resolvers:
for entry in resolve(url, resolver_config):
url = entry.get("url")
if url:
output_path = fetch_file(url, path, auth, **kwargs)
if output_path:
return output_path
return None

logger.warning(UNIMPLEMENTED % scheme)
return None


def fetch_single_file(url,
output_path=None,
config_file=DEFAULT_CONFIG_FILE,
keychain_file=DEFAULT_KEYCHAIN_FILE,
**kwargs):

auth = read_keychain(keychain_file)
keychain = read_keychain(keychain_file)
config = read_config(config_file)
cookies = get_request_cookies(config) if kwargs.get("cookie_scan", True) else None
result_path = fetch_file(url, output_path, auth, config=config, cookies=cookies, **kwargs)
cleanup_transports()
fetchers = kwargs.get("fetchers") or dict()
result_path = fetch_file(url, output_path,
keychain=keychain,
config=config,
cookies=cookies,
fetchers=fetchers,
**kwargs)
cleanup_fetchers(fetchers)

return result_path


def get_request_cookies(config):
fetch_config = config.get(FETCH_CONFIG_TAG, DEFAULT_FETCH_CONFIG)
http_fetch_config = fetch_config.get("http", dict())
cookie_jar_config = http_fetch_config.get(COOKIE_JAR_TAG, DEFAULT_COOKIE_JAR_SEARCH_CONFIG)
cookies = load_and_merge_cookie_jars(find_cookie_jars(cookie_jar_config)) if \
cookie_jar_config.get(COOKIE_JAR_SEARCH_TAG, True) else None
return cookies
def fetch_file(url, output_path, **kwargs):
scheme = urlsplit(url).scheme.lower()
config = kwargs.get("config") or DEFAULT_CONFIG
fetch_config = config.get(FETCH_CONFIG_TAG) or DEFAULT_FETCH_CONFIG
fetchers = kwargs.get("fetchers", dict())
fetcher = fetchers.get(scheme)
if not fetcher:
fetcher = find_fetcher(scheme, fetch_config)
if fetcher:
fetchers[scheme] = fetcher
if fetcher:
return fetcher.fetch(url, output_path, **kwargs)

# if we get here, assume the url contains an identifier scheme and try to resolve it as such
resolver_config = config.get(RESOLVER_CONFIG_TAG, DEFAULT_RESOLVER_CONFIG) if config else DEFAULT_RESOLVER_CONFIG
supported_resolvers = resolver_config.keys()
if scheme in supported_resolvers:
for entry in resolve(url, resolver_config):
url = entry.get("url")
if url:
result_path = fetch_file(url, output_path, **kwargs)
if result_path:
return result_path
return None

logger.warning(UNIMPLEMENTED % scheme)
return None


def cleanup_transports():
fetch_http.cleanup()
fetch_ftp.cleanup()
def cleanup_fetchers(fetchers):
if isinstance(fetchers, dict):
for fetcher in fetchers.values():
if isinstance(fetcher, BaseFetchTransport):
fetcher.cleanup()
12 changes: 8 additions & 4 deletions bdbag/fetch/resolvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@ def find_resolver(identifier, resolver_config):

resolver = None
resolvers = resolver_config.get(scheme, [])
for resolver in resolvers:
prefix = resolver.get("prefix")
for entry in resolvers:
prefix = entry.get("prefix")
if not prefix:
resolver = entry
continue
if prefix and prefix in path.lstrip("/"):
resolver = entry
break

if not resolver:
Expand All @@ -52,10 +56,10 @@ def find_resolver(identifier, resolver_config):
except KeyError:
module = import_module(module_name)
clazz = getattr(module, class_name) if module else None
except (ImportError, AttributeError):
except (ImportError, AttributeError, ValueError):
pass
if not clazz:
raise RuntimeError("Unable to import specified resolver class %s" % resolver_class)
raise RuntimeError("Unable to import specified resolver class: [%s]" % resolver_class)

return clazz(resolver.get(ID_RESOLVER_TAG, DEFAULT_ID_RESOLVERS), resolver_args)

Expand Down
2 changes: 1 addition & 1 deletion bdbag/fetch/resolvers/ark_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def handle_response(self, response):
entries = list()
try:
content = response.json()
except Exception as e:
except Exception as e: # pragma: no cover
logger.warning(
"Unable to parse identifier resolution result: a valid JSON structure was not found. Exception: %s. "
"Server response: %s" % (get_typed_exception(e), response.content))
Expand Down
6 changes: 3 additions & 3 deletions bdbag/fetch/resolvers/base_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ def resolve(self, identifier, headers=None):
break
r = session.get(resolver_url, headers=headers)
if r.status_code != 200:
logger.error('HTTP GET Failed for %s with code: %s' % (r.url, r.status_code))
logger.error("Host %s responded:\n\n%s" % (urlsplit(r.url).netloc, r.text))
logger.error('HTTP GET Failed for %s with code: %s' % (resolver_url, r.status_code))
logger.error("Host %s responded:\n\n%s" % (urlsplit(resolver_url).netloc, r.text))
continue
else:
urls = self.handle_response(r)
Expand All @@ -73,7 +73,7 @@ def resolve(self, identifier, headers=None):
(identifier, ', '.join([url["url"] for url in urls])))
break
else:
logger.warning("No file locations were found for identifier %s" % identifier)
logger.warning("No file locations were found for identifier: [%s]" % identifier)

session.close()
return urls
2 changes: 1 addition & 1 deletion bdbag/fetch/resolvers/dataguid_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def handle_response(self, response):
entries = list()
try:
content = response.json()
except Exception as e:
except Exception as e: # pragma: no cover
logger.warning(
"Unable to parse identifier resolution result: a valid JSON structure was not found. Exception: %s. "
"Server response: %s" % (get_typed_exception(e), response.content))
Expand Down
5 changes: 2 additions & 3 deletions bdbag/fetch/resolvers/doi_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ def handle_response(self, response):
entries = list()
try:
content = response.json()
except Exception as e:
except Exception as e: # pragma: no cover
logger.warning(
"Unable to parse identifier resolution result: a valid JSON structure was not found. Exception: %s. "
"Server response: %s" % (get_typed_exception(e), response.content))
return entries

base_entry = dict()
locations = content.get('contentUrl')
locations = content.get('contentUrl', content.get('url'))
if locations:
length = content.get("contentSize")
if length:
Expand All @@ -64,6 +64,5 @@ def handle_response(self, response):
entry = dict(base_entry)
entry["url"] = location
entries.append(entry)

return entries

Loading

0 comments on commit 88ed9e3

Please sign in to comment.