Skip to content

Commit

Permalink
Merge pull request #252 from mih/credsalad
Browse files Browse the repository at this point in the history
RF of `CredentialManager.get()` related code
  • Loading branch information
mih authored Feb 22, 2023
2 parents 5c2df9b + affea9c commit 232fe61
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 90 deletions.
237 changes: 149 additions & 88 deletions datalad_next/credman/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ def __init__(self, cfg=None):

# main API
#

#
# Design remark:
# This is the keyhole through which any retrieval-related processing goes
# (get, query, obtain). Any normalization performed in here will
# automatically also effect these other operations.
#
def get(self, name=None, *, _prompt=None, _type_hint=None, **kwargs):
"""Get properties and secret of a credential.
Expand Down Expand Up @@ -129,7 +136,8 @@ def get(self, name=None, *, _prompt=None, _type_hint=None, **kwargs):
is used to determine a credential type, to possibly enable further
lookup/entry of additional properties for a known credential type
**kwargs:
Credential property name/value pairs. For any property with a value
Credential property name/value pairs to overwrite/amend potentially
existing properties. For any property with a value
of ``None``, manual data entry will be performed, unless a value
could be retrieved on lookup, or prompting was not enabled.
Expand Down Expand Up @@ -161,98 +169,43 @@ def get(self, name=None, *, _prompt=None, _type_hint=None, **kwargs):
# if we have a chance to query for stored legacy credentials
# we do this first to have the more modern parts of the
# system overwrite them reliably
_type_hint = kwargs.get('type', _type_hint)
if not _type_hint:
# no type hint given in any form. Last chance is that
# this is a known legacy credential.
# doing this query is a bit expensive, but getting a
# credential is not a high-performance procedure, and
# the gain in convenience is substantial -- otherwise
# users would need to somehow know what they should be
# looking for
_type_hint = dict(_yield_legacy_credential_types()).get(name)
cred = self._get_legacy_credential_from_keyring(
name, _type_hint) or {}
if cred:
# at least some info came from the legacy backend, record that
cred['_from_backend'] = 'legacy'
name,
type_hint=kwargs.get('type', _type_hint),
) or {}

# and now take whatever we got from the legacy store and update
# it from what we have in the config
var_prefix = _get_cred_cfg_var(name, '')
cred_update = {
k[len(var_prefix):]: v
for k, v in self._cfg.items()
if k.startswith(var_prefix)
}
cred_update = self._get_credential_from_cfg(name)
if set(cred_update.keys()) >= set(
k for k in cred.keys() if k != '_from_backend'):
# we are overwriting all info from a possible legacy credential
# take marker off
cred.pop('_from_backend', None)
cred.update(cred_update)

# final word on the credential type
_type_hint = cred.get('type', kwargs.get('type', _type_hint))
if _type_hint:
# import the definition of expected fields from the known
# credential types
cred_type = self._cred_types.get(
_type_hint,
dict(fields=[], secret=None))
for k in (cred_type['fields'] or []):
if k == cred_type['secret'] or k in kwargs:
# do nothing, if this is the secret key
# or if we have an incoming value for this key already
continue
# otherwise make sure we prompt for the essential
# fields
kwargs[k] = None

prompted = False
for k, v in kwargs.items():
if k == 'secret':
# handled below
continue
if _prompt and v is None and cred.get(k) is None:
# ask if enabled, no value was provided,
# and no value is already on record
v = self._ask_property(k, None if prompted else _prompt)
if v is not None:
prompted = True
if v:
cred[k] = v

# start locating the secret at the method parameters
secret = kwargs.get('secret')
if secret is None and name:
# get the secret, from the effective config, not just the keystore
secret = self._get_secret(name, type_hint=_type_hint)
if _prompt and secret is None:
secret = self._ask_secret(
type_hint=self._cred_types.get(
_type_hint, {}).get('secret'),
prompt=None if prompted else _prompt,
)
if secret:
prompted = True

if secret:
cred['secret'] = secret

if not cred:
# if there is absolutely nothing to report, report None
# we merge existing credential properties with overrides.
# we are only adding 'enter-please' markers (i.e. None) for properties
# that have no known value yet
cred.update(**{k: v for k, v in kwargs.items()
if v is not None or k not in cred})

# final word on the credential type, if there is any type info at all
# `cred` will have a `type` property after this
self._assign_credential_type(cred, _type_hint)

# determine what is missing and possibly prompt for it
self._complete_credential_props(name, cred, _prompt)

if not cred.get('secret'):
# no secret, no credential
if any(not p.startswith('_') for p in cred):
lgr.debug(
'Not reporting on credential fragment with no secret: %r',
cred,
)
return

if 'type' not in cred and _type_hint:
# make sure we always report a type whenever we have any clue
cred['type'] = _type_hint

# report whether there were any edits to the credential record
# (incl. being entirely new), such that consumers can decide
# to save a credentials, once battle-tested
if prompted:
cred['_edited'] = True
return cred

def set(self,
Expand All @@ -269,11 +222,12 @@ def set(self,
when support for alternative backends is added, at which point
the ``name`` parameter would become optional.
All properties provided as `kwargs` with values that are not ``None``
will be stored. If ``kwargs`` do not contain a ``secret`` specification,
manual entry will be attempted. The associated prompt with be either
the name of the ``secret`` field of a known credential (as identified via
a ``type`` property), or the label ``'secret'``.
All properties provided as `kwargs` with keys not starting with `_` and
with values that are not ``None`` will be stored. If ``kwargs`` do not
contain a ``secret`` specification, manual entry will be attempted. The
associated prompt with be either the name of the ``secret`` field of a
known credential (as identified via a ``type`` property), or the label
``'secret'``.
All properties with an associated value of ``None`` will be removed
(unset).
Expand Down Expand Up @@ -361,14 +315,15 @@ def set(self,
# forcing each caller to to this by hand is kinda pointless, if
# they can never be stored anyway, and e.g. a previous `get()`
# would include one for any credentials that was manually entered
kwargs = {k: v for k, v in kwargs.items() if not k.startswith('_')}
kwargs = self._strip_internal_properties(kwargs)
# check syntax for the rest
verify_property_names(kwargs)
# if we know the type, hence we can do a query for legacy secrets
# and properties. This will migrate them to the new setup
# over time
type_hint = kwargs.get('type')
cred = self._get_legacy_credential_from_keyring(name, type_hint) or {}
cred = self._strip_internal_properties(cred)
# amend with given properties
cred.update(**kwargs)
# update last-used, if requested
Expand Down Expand Up @@ -535,6 +490,10 @@ def query_(self, **kwargs):
# an actual credential property.
# the credentials command will then also treat it as such
cred['_from_backend'] = 'legacy'
if not cred:
# no info on such a credential, not even legacy info
# ignore
continue
if not kwargs:
yield (name, cred)
else:
Expand Down Expand Up @@ -724,6 +683,94 @@ def obtain(self,

# internal helpers
#
def _strip_internal_properties(self, cred: Dict) -> Dict:
return {k: v for k, v in cred.items() if not k.startswith('_')}

def _assign_credential_type(self, cred, _type_hint=None):
"""Set 'type' property (in-place)"""
_type_hint = cred.get('type', _type_hint)
if _type_hint:
cred['type'] = _type_hint

def _complete_credential_props(
self, name: str,
cred: Dict,
prompt: str | None,
) -> None:
"""Determine missing credential properties, and fill them in
What properties are missing is determined based on credential
type info, and their values will be prompted for (if a prompt
was provided).
The given credential is modified in place.
"""
ct = cred.get('type')
if ct:
# import the definition of expected fields from the known
# credential types
cred_type_def = self._cred_types.get(
ct,
dict(fields=[], secret=None))
for k in (cred_type_def['fields'] or []):
if k == cred_type_def['secret'] or k in cred:
# do nothing, if this is the secret key
# or if we have an incoming value for this key already
continue
# otherwise make sure we prompt for the essential
# fields
cred[k] = None

# - prompt for required but missing prompts
# - retrieve a secret
prompted = False
entered = {}
for k, v in cred.items():
if k == 'secret':
# handled below
continue
if prompt and v is None:
# prevent double-prompting for entering a series of properties
# of the same credential
v = self._ask_property(k, None if prompted else prompt)
if v is not None:
prompted = True
if v:
entered[k] = v

# bulk merged, cannot do in-loop above, because we iterate over items()
cred.update(entered)

# start locating the secret at the method parameters
secret = cred.get('secret')
if secret is None and name:
# get the secret, from the effective config, not just the keystore
secret = self._get_secret(name, type_hint=ct)
if prompt and secret is None:
secret = self._ask_secret(
type_hint=self._cred_types.get(ct, {}).get('secret'),
prompt=None if prompted else prompt,
)
if secret:
prompted = True

if secret:
cred['secret'] = secret

# report whether there were any edits to the credential record
# (incl. being entirely new), such that consumers can decide
# to save a credentials, once battle-tested
if prompted:
cred['_edited'] = True

def _get_credential_from_cfg(self, name: str) -> Dict:
var_prefix = _get_cred_cfg_var(name, '')
return {
k[len(var_prefix):]: v
for k, v in self._cfg.items()
if k.startswith(var_prefix)
}

def _get_known_credential_names(self) -> set:
known_credentials = set(
'.'.join(k.split('.')[2:-1]) for k in self._cfg.keys()
Expand Down Expand Up @@ -781,12 +828,23 @@ def _unset_credprops_anyscope(self, name, keys):
def _get_legacy_credential_from_keyring(
self,
name: str,
type_hint: str,
type_hint: str | None,
) -> Dict | None:
"""With a ``type_hint`` given, attempts to retrieve a credential
"""With a ``type_hint`` given or determined from a known legacy
credential, attempts to retrieve a credential
comprised of all fields defined in ``self._cred_types``. Otherwise
``None`` is returned.
"""
if not type_hint:
# no type hint given in any form. Last chance is that
# this is a known legacy credential.
# doing this query is a bit expensive, but getting a
# credential is not a high-performance procedure, and
# the gain in convenience is substantial -- otherwise
# users would need to somehow know what they should be
# looking for
type_hint = dict(_yield_legacy_credential_types()).get(name)

if not type_hint or type_hint not in self._cred_types:
return

Expand All @@ -805,6 +863,9 @@ def _get_legacy_credential_from_keyring(
# there is nothing on a legacy credential with this name
return None
else:
# at least some info came from the legacy backend, record that
cred['_from_backend'] = 'legacy'
cred['type'] = type_hint
return cred

def _get_secret(self, name, type_hint=None):
Expand Down
6 changes: 4 additions & 2 deletions datalad_next/credman/tests/test_credman.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ def check_credmanager():
# doesn't work with thing air
assert_raises(ValueError, credman.get)
eq_(credman.get('donotexiststest'), None)
# but if there is anything, report it
# we get reports as soon as there is a secret available
# this makes it possible to discover credential fragments, if only to
# expose them for clean-up
eq_(credman.get(crazy='empty'), {'crazy': 'empty'})
eq_(credman.get(crazy='empty'), None)
eq_(credman.get(crazy='empty', secret='bogus'),
{'crazy': 'empty', 'secret': 'bogus'})
# does not fiddle with a secret that is readily provided
eq_(credman.get('dummy', secret='mike', _type_hint='token'),
dict(type='token', secret='mike'))
Expand Down

0 comments on commit 232fe61

Please sign in to comment.