generated from datalad/datalad-extension-template
-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
See #224
- Loading branch information
Showing
6 changed files
with
660 additions
and
337 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
from __future__ import annotations | ||
|
||
from typing import ( | ||
Generator, | ||
) | ||
from .credential import ( | ||
Credential, | ||
Secret, | ||
) | ||
|
||
|
||
class CredentialBackend: | ||
"""Interface to be implemented for any CredentialManager backend""" | ||
def __getitem__(self, identifier: str) -> Credential: | ||
"""Retrieve a particular credential | ||
The returned credentials contains all known properties and a secret | ||
(if available). | ||
Parameters | ||
---------- | ||
identifier: str | ||
The identifier of the credential to be retrieved. | ||
Returns | ||
------- | ||
dict | ||
A mapping of credential properties to their values. The property key | ||
of a credential's secret is 'secret'. | ||
""" | ||
raise NotImplementedError | ||
|
||
def __setitem__(self, identifier: str, value: Credential): | ||
"""Set a particular credential. | ||
Parameters | ||
---------- | ||
identifier: str | ||
The identifier of the credential to be deleted. | ||
value: Credential | ||
The credential to set. | ||
""" | ||
raise NotImplementedError | ||
|
||
def __delitem__(self, identifier: str): | ||
"""Delete a particular credential | ||
Parameters | ||
---------- | ||
identifier: str | ||
The identifier of the credential to be deleted. | ||
""" | ||
raise NotImplementedError | ||
|
||
def __iter__(self) -> Generator[str, None, None]: | ||
"""Yields the identifiers of all known credentials""" | ||
raise NotImplementedError | ||
|
||
def __contains__(self, identifier: str) -> bool: | ||
raise NotImplementedError | ||
|
||
def get_credential_type(self, type_id: str | None = None) -> Type[Credential]: | ||
# the only recognized standard credentials is comprised (at minimum) | ||
# of a 'secret' only. | ||
# particular backends may support additional/different credentials. | ||
# the reason why backends get a say in this at all is that they | ||
# need to ensure that the "secret" (in whatever disguise it is | ||
# presented) actually is put into a secret store -- which may | ||
# be different from the place all other credential properties | ||
# are stored. | ||
# The credential manager is asking the backend on what information | ||
# it needs to know, and how they should be labeled, and feeds | ||
# the collected credential info in this format back to the backend. | ||
return Secret |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
from typing import ( | ||
Dict, | ||
Generator, | ||
Type, | ||
) | ||
|
||
from datalad_next.exceptions import ( | ||
CapturedException, | ||
CommandError, | ||
) | ||
from .backend import CredentialBackend | ||
from .credential import ( | ||
Credential, | ||
Secret, | ||
UserPassword, | ||
Token, | ||
) | ||
|
||
_credential_types = { | ||
'secret': Secret, | ||
'user_password': UserPassword, | ||
'token': Token, | ||
} | ||
|
||
|
||
class GitConfigKeyringBackend(CredentialBackend): | ||
_cfg_section_prefix = 'datalad.credential.' | ||
|
||
def __init__(self, cfg=None): | ||
""" | ||
Parameters | ||
---------- | ||
cfg: ConfigManager, optional | ||
If given, all configuration queries are performed using this | ||
``ConfigManager`` instance. Otherwise ``datalad.cfg`` is used. | ||
""" | ||
self._cfg = cfg | ||
self.__keyring = None | ||
|
||
def get_credential_type( | ||
self, type_id: str | None = None) -> Type[Credential]: | ||
return _credential_types.get(type_id, Secret) | ||
|
||
def __getitem__(self, identifier: str) -> Credential: | ||
var_prefix = self._get_cred_cfg_var(identifier, '') | ||
# get any related info from config | ||
cred_props = { | ||
k[len(var_prefix):]: v | ||
for k, v in self._cfg.items() | ||
if k.startswith(var_prefix) | ||
} | ||
cred = self.get_credential_type(cred_props.get('type'))(**cred_props) | ||
# be statisfied with a secret from the config | ||
if cred.secret: | ||
return cred | ||
|
||
# otherwise ask the secret store, always get the uniform | ||
# 'secret' field | ||
secret = self._keyring.get(identifier, 'secret') | ||
|
||
if not cred_props and not secret: | ||
# no properties, no secret | ||
raise KeyError(f'No credential with name {identifier!r}') | ||
|
||
cred.secret = secret | ||
return cred | ||
|
||
def _get_credential_secret(self, identifier: str) -> str: | ||
return self._keyring.get(identifier, 'secret') | ||
|
||
def __setitem__(self, identifier: str, value: Credential): | ||
# update record, which properties did actually change? | ||
# this is not always the same as the input, e.g. when | ||
# a property would be _set_ at global scope, but is already | ||
# defined at system scope | ||
# TODO is this actually a good thing to do? What if the | ||
# higher scope drops the setting and cripples a user setup? | ||
# the feature-angle here is that an existing piece of | ||
# information is not copied into the user-domain, where it | ||
# is then fixed, and a system update cannot alter it without | ||
# user intervention | ||
updated = {} | ||
|
||
# remove props | ||
# | ||
remove_props = [ | ||
k for k, v in properties.items() if v is None and k != 'secret'] | ||
self._unset_credprops_anyscope(identifier, remove_props) | ||
updated.update(**{k: None for k in remove_props}) | ||
|
||
# set non-secret props | ||
# | ||
set_props = { | ||
k: v for k, v in properties.items() | ||
if v is not None and k != 'secret' | ||
} | ||
for k, v in set_props.items(): | ||
var = self._get_cred_cfg_var(identifier, k) | ||
if self._cfg.get(var) == v: | ||
# desired value already exists, we are not | ||
# storing again to preserve the scope it | ||
# was defined in | ||
continue | ||
# we always write to the global scope (ie. user config) | ||
# credentials are typically a personal, not a repository | ||
# specific entity -- likewise secrets go into a personal | ||
# not repository-specific store | ||
# for custom needs users can directly set the respective | ||
# config | ||
self._cfg.set(var, v, scope='global', force=True, reload=False) | ||
updated[k] = v | ||
if set_props: | ||
# batch reload | ||
self._cfg.reload() | ||
|
||
# at this point we will have a secret. it could be from ENV | ||
# or provided, or entered. we always want to put it in the | ||
# store | ||
if 'secret' in properties: | ||
# TODO at test for setting with secret=None as a property | ||
# this would cause a credential without a secret. Is this possible? | ||
# Is this desirable? If so, document and support explicitly. And | ||
# add a test | ||
self._keyring.set(identifier, 'secret', properties['secret']) | ||
updated['secret'] = properties['secret'] | ||
return updated | ||
|
||
def __delitem__(self, identifier: str): | ||
# prefix for all config variables of this credential | ||
prefix = self._get_cred_cfg_var(identifier, '') | ||
|
||
to_remove = [ | ||
k[len(prefix):] for k in self._cfg.keys() | ||
if k.startswith(prefix) | ||
] | ||
if to_remove: | ||
self._unset_credprops_anyscope(identifier, to_remove) | ||
|
||
# we always use the uniform 'secret' field | ||
self._delete_keyring_field(identifier, 'secret') | ||
|
||
def __iter__(self) -> Generator[str, None, None]: | ||
yield from self._get_known_credential_names() | ||
|
||
def __contains__(self, identifier: str) -> bool: | ||
return identifier in self._get_known_credential_names() | ||
|
||
def _delete_keyring_field(self, record: str, field: str): | ||
# delete the secret from the keystore, if there is any | ||
try: | ||
self._keyring.delete(record, field) | ||
except Exception as e: | ||
if self._keyring.get(record, field) is None: | ||
# whatever it was, the target is reached | ||
CapturedException(e) | ||
else: | ||
# we could not delete the field | ||
raise # pragma: nocover | ||
|
||
@property | ||
def _keyring(self): | ||
"""Returns the DataLad keyring wrapper | ||
This internal property may vanish whenever changes to the supported | ||
backends are made. | ||
""" | ||
if self.__keyring: | ||
return self.__keyring | ||
from datalad.support.keyring_ import keyring | ||
self.__keyring = keyring | ||
return keyring | ||
|
||
def _get_known_credential_names(self) -> set: | ||
known_credentials = set( | ||
'.'.join(k.split('.')[2:-1]) for k in self._cfg.keys() | ||
if k.startswith(self._cfg_section_prefix) | ||
) | ||
return known_credentials | ||
|
||
def _unset_credprops_anyscope(self, name, keys): | ||
"""Reloads the config after unsetting all relevant variables | ||
This method does not modify the keystore. | ||
""" | ||
nonremoved_vars = [] | ||
for k in keys: | ||
var = self._get_cred_cfg_var(name, k) | ||
if var not in self._cfg: | ||
continue | ||
try: | ||
self._cfg.unset(var, scope='global', reload=False) | ||
except CommandError as e: | ||
CapturedException(e) | ||
try: | ||
self._cfg.unset(var, scope='local', reload=False) | ||
except CommandError as e: | ||
CapturedException(e) | ||
nonremoved_vars.append(var) | ||
if nonremoved_vars: | ||
raise RuntimeError( | ||
f"Cannot remove configuration items {nonremoved_vars} " | ||
f"for credential, defined outside global or local " | ||
"configuration scope. Remove manually") | ||
self._cfg.reload() | ||
|
||
def _get_cred_cfg_var(self, name, prop): | ||
"""Return a config variable name for a credential property | ||
Parameters | ||
---------- | ||
name : str | ||
Credential name | ||
prop : str | ||
Property name | ||
Returns | ||
------- | ||
str | ||
""" | ||
return f'{self._cfg_section_prefix}{name}.{prop}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
from typing import ( | ||
Dict, | ||
Generator, | ||
) | ||
|
||
from datalad_next.exceptions import ( | ||
CapturedException, | ||
CommandError, | ||
) | ||
from datalad_next.utils.credman_backend_gitcfg_keyring import ( | ||
GitConfigKeyringBackend, | ||
) | ||
|
||
|
||
class GitConfigKeyringWithLegacySupportBackend(GitConfigKeyringBackend): | ||
def get_credential(self, identifier): | ||
# if we have a chance to query for stored legacy credentials | ||
# we do this first to have the more modern parts of the | ||
# system overwrite them reliably | ||
cred = self._get_legacy_field_from_keyring(identifier, type) or {} | ||
|
||
# retrieve properties from config | ||
cred.update(super().get_credential(identifier)) | ||
|
||
secret = self._cfg.get(self._get_cred_cfg_var(identifier, 'secret')) | ||
|
||
return cred | ||
|
||
def _get_secret_from_keyring(self, name, type_hint=None): | ||
""" | ||
Returns | ||
------- | ||
str or None | ||
None is return when no secret for the given credential name | ||
could be found. Otherwise, the secret is returned. | ||
""" | ||
# always get the uniform | ||
secret = self._keyring.get(name, 'secret') | ||
if secret: | ||
return secret | ||
# fall back on a different "field" that is inferred from the | ||
# credential type | ||
secret_field = self._cred_types.get( | ||
type_hint, {}).get('secret') | ||
if not secret_field: | ||
return | ||
# first try to get it from the config to catch any overrides | ||
secret = self._cfg.get(self._get_cred_cfg_var(name, secret_field)) | ||
if secret is not None: | ||
return secret | ||
secret = self._keyring.get(name, secret_field) | ||
return secret | ||
|
||
def delete_credential(self, | ||
identifier: str, | ||
type_hint: str | None = None) -> bool: | ||
super().delete_credential(self, identifier) | ||
|
||
# remove legacy records too | ||
for field in self._cred_types.get( | ||
type_hint, {}).get('fields', []): | ||
self._delete_keyring_field(identifier, field) | ||
|
||
def list_credentials(self) -> Generator[str, None, None]: | ||
from datalad.downloaders.providers import Providers | ||
|
||
# we must deduplicate between modern and legacy credentials | ||
# first modern ones | ||
reported = set() | ||
for name in super().list_credentials(): | ||
if name not in reported: | ||
yield name | ||
reported.add(name) | ||
|
||
# and the legacy ones | ||
for name in set( | ||
p.credential.name | ||
for p in Providers.from_config_files() | ||
if p.credential | ||
): | ||
if name not in reported: | ||
yield name | ||
reported.add(name) | ||
|
||
def _get_legacy_field_from_keyring(self, name, type_hint): | ||
if not type_hint or type_hint not in self._cred_types: | ||
return | ||
|
||
cred = {} | ||
lc = self._cred_types[type_hint] | ||
for field in (lc['fields'] or []): | ||
if field == lc['secret']: | ||
continue | ||
val = self._keyring.get(name, field) | ||
if val: | ||
# legacy credentials used property names with underscores, | ||
# but this is no longer syntax-compliant -- fix on read | ||
cred[field.replace('_', '-')] = val | ||
if 'type' not in cred: | ||
cred['type'] = type_hint | ||
return cred |
Oops, something went wrong.