Skip to content

Commit

Permalink
Make backend+CLI compatible with new CDS
Browse files Browse the repository at this point in the history
  • Loading branch information
BSchilperoort committed Jul 31, 2024
1 parent f211c70 commit 08480c6
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 60 deletions.
39 changes: 8 additions & 31 deletions era5cli/args/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ def add_config_args(subparsers: argparse._SubParsersAction) -> None:
Adds the 'config' parser with the following arguments:
--show
--uid
--key
--url
Expand All @@ -27,7 +26,7 @@ def add_config_args(subparsers: argparse._SubParsersAction) -> None:
This will create a config file in your home directory, in folder named
".config". The CDS URL, your UID and the CDS keys will be stored here.
To find your key and UID, go to https://cds.climate.copernicus.eu/ and
To find your key, go to https://beta-cds.climate.copernicus.eu/ and
login with your email and password. Then go to your user profile (top
right).
Expand All @@ -54,16 +53,6 @@ def add_config_args(subparsers: argparse._SubParsersAction) -> None:
),
)

config.add_argument(
"--uid",
type=str,
help=textwrap.dedent(
"""
Your CDS User ID, e.g.: 123456
"""
),
)

config.add_argument(
"--key",
type=str,
Expand Down Expand Up @@ -101,27 +90,15 @@ def run_config(args):
Args:
args: Arguments collected by argparse
Returns:
True
"""
if args.show and any((args.uid, args.key)):
if args.show and args.key is not None:
raise InputError("Either call `show` or set the key. Not both.")
if not args.show and (args.uid is None or args.key is None):
raise InputError("Both the UID and the key are required inputs.")
if not args.show and args.key is None:
raise InputError("Your CDS API key is a required input.")
if args.show:
url, fullkey = key_management.load_era5cli_config()
uid, key = fullkey.split(":")
url, key = key_management.load_era5cli_config()
print(
"Contents of .config/era5cli.txt:\n"
f" uid: {uid}\n"
f" key: {key}\n"
f" url: {url}\n"
"Contents of .config/era5cli.txt:\n" f" url: {url}\n" f" key: {key}\n"
)
return True

return key_management.set_config(
url=args.url,
uid=args.uid,
key=args.key,
)
else:
key_management.set_config(args.url, args.key)
55 changes: 33 additions & 22 deletions era5cli/key_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

ERA5CLI_CONFIG_PATH = Path.home() / ".config" / "era5cli" / "cds_key.txt"
CDSAPI_CONFIG_PATH = Path.home() / ".cdsapirc"
DEFAULT_CDS_URL = "https://cds.climate.copernicus.eu/api/v2"
DEFAULT_CDS_URL = "https://cds-beta.climate.copernicus.eu/api"

AUTH_ERR_MSG = "401 Authorization Required"
NO_DATA_ERR_MSG = "There is no data matching your request"
Expand All @@ -22,7 +22,7 @@ class InvalidLoginError(Exception):
"Raised when an invalid login is provided to the cds server."


def attempt_cds_login(url: str, fullkey: str) -> True:
def attempt_cds_login(url: str, key: str) -> True:
"""Attempt to connect to the CDS, to validate the URL and UID + key.
Args:
Expand All @@ -40,14 +40,14 @@ def attempt_cds_login(url: str, fullkey: str) -> True:
"""
connection = cdsapi.Client(
url=url,
key=fullkey,
key=key,
verify=True,
quiet=True, # Supress output to the console from the test retrieve.
)

try:
# Check the URL
connection.status() # pragma: no cover
# Check the URL (broken in 0.7.0...)
# connection.status()

# Checks if the authentication works, without downloading data
connection.retrieve( # pragma: no cover
Expand All @@ -74,7 +74,7 @@ def attempt_cds_login(url: str, fullkey: str) -> True:
if AUTH_ERR_MSG in str(err):
raise InvalidLoginError(
f"{os.linesep}Authorization with the CDS served failed. Likely due to"
" an incorrect key or UID."
" an incorrect key."
f"{os.linesep}Please check your era5cli configuration file: "
f"{ERA5CLI_CONFIG_PATH.resolve()}{os.linesep}"
"Or redefine your configuration with 'era5cli config'"
Expand All @@ -89,22 +89,18 @@ def attempt_cds_login(url: str, fullkey: str) -> True:

def set_config(
url: str,
uid: str,
key: str,
) -> True:
"""Check the user-input configuration. Entry point for the CLI."""
try:
attempt_cds_login(url, fullkey=f"{uid}:{key}")
write_era5cli_config(url, uid, key)
attempt_cds_login(url, key)
write_era5cli_config(url, key)
print(
f"Keys succesfully validated and stored in {ERA5CLI_CONFIG_PATH.resolve()}"
)
return True
except InvalidLoginError:
print(
"Error: the UID and key are rejected by the CDS. "
"Please check and try again."
)
print("Error: the key is rejected by the CDS. " "Please check and try again.")
return False


Expand Down Expand Up @@ -134,41 +130,56 @@ def valid_cdsapi_config() -> bool:
True if a valid key has been found & written to file. Otherwise False.
"""
if CDSAPI_CONFIG_PATH.exists():
url, fullkey = load_cdsapi_config()
url, key = load_cdsapi_config()
try:
if sys.stdin.isatty() and attempt_cds_login(url, fullkey):
if sys.stdin.isatty() and attempt_cds_login(url, key):
userinput = input(
"Valid CDS keys found in the .cdsapirc file. Do you want to use "
"these for era5cli? [Y/n]"
)
if userinput.lower() in ["y", "yes", ""]:
set_config(
url, uid=fullkey.split(":")[0], key=fullkey.split(":")[1]
)
set_config(url, key)
return True
except (ConnectionError, InvalidLoginError, InvalidRequestError):
return False
return False


def load_era5cli_config() -> Tuple[str, str]:
with open(ERA5CLI_CONFIG_PATH, encoding="utf8") as f:
contents = "".join(f.readlines())
if "uid" in contents:
msg = (
"Old config detected. In the new CDS API only a key is required.\n"
"Please look at the new CDS website, and reconfigure your login in "
"era5cli\n"
" https://cds-beta.climate.copernicus.eu/"
)
raise InvalidLoginError(msg)

with open(ERA5CLI_CONFIG_PATH, encoding="utf8") as f:
url = f.readline().replace("url:", "").strip()
uid = f.readline().replace("uid:", "").strip()
key = f.readline().replace("key:", "").strip()
return url, f"{uid}:{key}"
return url, key


def write_era5cli_config(url: str, uid: str, key: str):
def write_era5cli_config(url: str, key: str):
ERA5CLI_CONFIG_PATH.parent.mkdir(exist_ok=True, parents=True)
with open(ERA5CLI_CONFIG_PATH, mode="w", encoding="utf-8") as f:
f.write(f"url: {url}\n")
f.write(f"uid: {uid}\n")
f.write(f"key: {key}\n")


def load_cdsapi_config() -> Tuple[str, str]:
with open(CDSAPI_CONFIG_PATH, encoding="utf-8") as f:
url = f.readline().replace("url:", "").strip()
key_line = f.readline()
if ":" in key_line or "api/v2" in url:
msg = (
"Your CDS API configuration file contains a UID entry/incorrect URL.\n"
"Please look at the new CDS website, and reconfigure your key:\n"
" https://cds-beta.climate.copernicus.eu/"
)
raise InvalidLoginError(msg)
fullkey = f.readline().replace("key:", "").strip()
return url, fullkey
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ classifiers = [
"Programming Language :: Python :: 3.11",
]
dependencies = [
"cdsapi == 0.5.1",
"cdsapi>=0.7.0",
"pathos",
"PTable",
"netCDF4"
Expand Down
10 changes: 4 additions & 6 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class TestAttemptCdsLogin:
def test_status_fail(self):
with patch("cdsapi.Client.status", side_effect=rex.ConnectionError):
with pytest.raises(rex.ConnectionError, match="Failed to connect to CDS"):
key_management.attempt_cds_login(url="test", fullkey="abc:def")
key_management.attempt_cds_login(url="test", key="abc:def")

def test_connection_fail(self):
mp1 = patch("cdsapi.Client.status")
Expand All @@ -123,7 +123,7 @@ def test_connection_fail(self):
key_management.InvalidLoginError,
match="Authorization with the CDS served failed",
):
key_management.attempt_cds_login(url="test", fullkey="abc:def")
key_management.attempt_cds_login(url="test", key="abc:def")

def test_retrieve_fail(self):
mp1 = patch("cdsapi.Client.status")
Expand All @@ -136,12 +136,10 @@ def test_retrieve_fail(self):
key_management.InvalidRequestError,
match="Something changed in the CDS API",
):
key_management.attempt_cds_login(url="test", fullkey="abc:def")
key_management.attempt_cds_login(url="test", key="abc:def")

def test_all_pass(self):
mp1 = patch("cdsapi.Client.status")
mp2 = patch("cdsapi.Client.retrieve")
with mp1, mp2:
assert (
key_management.attempt_cds_login(url="test", fullkey="abc:def") is True
)
assert key_management.attempt_cds_login(url="test", key="abc:def") is True

0 comments on commit 08480c6

Please sign in to comment.