diff --git a/nethsm-scheme.json b/nethsm-scheme.json index 311d0881..e2549f7b 100644 --- a/nethsm-scheme.json +++ b/nethsm-scheme.json @@ -559,6 +559,7 @@ ], "get": { "description": "Retrieve public key in PEM format.", + "produces": ["application/x-pem-file"], "responses": { "200": { "description": "", @@ -2296,7 +2297,7 @@ "enum": [ "debug", "info", - "warn", + "warning", "error" ], "type": "string" diff --git a/pynitrokey/cli/nethsm.py b/pynitrokey/cli/nethsm.py index fc9f8430..13b70767 100644 --- a/pynitrokey/cli/nethsm.py +++ b/pynitrokey/cli/nethsm.py @@ -11,14 +11,22 @@ import datetime import click +import urllib3 import pynitrokey.nethsm +def make_enum_type(enum_cls): + return click.Choice([variant.value for variant in enum_cls], case_sensitive=False) + + DATETIME_TYPE = click.DateTime(formats=["%Y-%m-%dT%H:%M:%S%z"]) -ROLE_TYPE = click.Choice( - [role.value for role in pynitrokey.nethsm.Role], - case_sensitive=False, -) +ROLE_TYPE = make_enum_type(pynitrokey.nethsm.Role) +LOG_LEVEL_TYPE = make_enum_type(pynitrokey.nethsm.LogLevel) +UNATTENDED_BOOT_STATUS_TYPE = make_enum_type(pynitrokey.nethsm.UnattendedBootStatus) +ALGORITHM_TYPE = make_enum_type(pynitrokey.nethsm.KeyAlgorithm) +MECHANISM_TYPE = make_enum_type(pynitrokey.nethsm.KeyMechanism) +DECRYPT_MODE_TYPE = make_enum_type(pynitrokey.nethsm.DecryptMode) +SIGN_MODE_TYPE = make_enum_type(pynitrokey.nethsm.SignMode) def print_row(values, widths): @@ -52,8 +60,13 @@ def print_table(headers, data): ) @click.option("-u", "--username", "username", help="The NetHSM user name") @click.option("-p", "--password", "password", help="The NetHSM password") +@click.option( + "--verify-tls/--no-verify-tls", + default=True, + help="Whether to verify the TLS certificate of the NetHSM", +) @click.pass_context -def nethsm(ctx, host, version, username, password): +def nethsm(ctx, host, version, username, password, verify_tls): """Interact with NetHSM, see subcommands.""" ctx.ensure_object(dict) @@ -61,6 +74,10 @@ def nethsm(ctx, host, version, username, password): ctx.obj["NETHSM_VERSION"] = version ctx.obj["NETHSM_USERNAME"] = username ctx.obj["NETHSM_PASSWORD"] = password + ctx.obj["NETHSM_VERIFY_TLS"] = verify_tls + + if not verify_tls: + urllib3.disable_warnings() @contextlib.contextmanager @@ -69,6 +86,8 @@ def connect(ctx, require_auth=True): version = ctx.obj["NETHSM_VERSION"] username = None password = None + verify_tls = ctx.obj["NETHSM_VERIFY_TLS"] + if require_auth: username = ctx.obj["NETHSM_USERNAME"] password = ctx.obj["NETHSM_PASSWORD"] @@ -79,11 +98,18 @@ def connect(ctx, require_auth=True): f"[auth] Password for user {username} on NetHSM {host}", hide_input=True ) - with pynitrokey.nethsm.connect(host, version, username, password) as nethsm: + with pynitrokey.nethsm.connect(host, version, username, password, verify_tls) as nethsm: + import urllib3.exceptions + try: yield nethsm except pynitrokey.nethsm.NetHSMError as e: raise click.ClickException(e) + except urllib3.exceptions.MaxRetryError as e: + if isinstance(e.reason, urllib3.exceptions.SSLError): + raise click.ClickException(f"Could not connect to the NetHSM: {e.reason}\nIf you use a self-signed certificate, please set the --no-verify-tls option.") + else: + raise e @nethsm.command() @@ -143,7 +169,7 @@ def provision(ctx, unlock_passphrase, admin_passphrase, system_time): interactively. If the system time is not set, the current system time is used.""" if not system_time: - system_time = datetime.datetime.now() + system_time = datetime.datetime.now(datetime.timezone.utc) with connect(ctx, require_auth=False) as nethsm: nethsm.provision(unlock_passphrase, admin_passphrase, system_time) print(f"NetHSM {nethsm.host} provisioned") @@ -259,3 +285,583 @@ def set_passphrase(ctx, user_id, passphrase): user_id = nethsm.username nethsm.set_passphrase(user_id, passphrase) print(f"Updated the passphrase for user {user_id} on NetHSM {nethsm.host}") + + +@nethsm.command() +@click.pass_context +def info(ctx): + """Query the vendor and product information for a NetHSM.""" + with connect(ctx, require_auth=False) as nethsm: + (vendor, product) = nethsm.get_info() + print(f"Host: {nethsm.host}") + print(f"Vendor: {vendor}") + print(f"Product: {product}") + + +@nethsm.command() +@click.pass_context +def state(ctx): + """Query the state of a NetHSM.""" + with connect(ctx, require_auth=False) as nethsm: + state = nethsm.get_state() + print(f"NetHSM {nethsm.host} is {state.value}") + + +@nethsm.command() +@click.argument("length", type=int) +@click.pass_context +def random(ctx, length): + """Retrieve random bytes from the NetHSM as a Base64 string. + + This command requires authentication as a user with the Operator role.""" + with connect(ctx) as nethsm: + print(nethsm.get_random_data(length)) + + +@nethsm.command() +@click.pass_context +def metrics(ctx): + """Query the metrics of a NetHSM. + + This command requires authentication as a user with the Metrics role.""" + with connect(ctx) as nethsm: + headers = ["Metric", "Value"] + data = nethsm.get_metrics() + print_table(headers, [list(row) for row in sorted(data.items())]) + + +@nethsm.command() +@click.option( + "--details/--no-details", + default=True, + help="Also query the key data", +) +@click.pass_context +def list_keys(ctx, details): + """List all keys on the NetHSM. + + This command requires authentication as a user with the Administrator or + Operator role.""" + with connect(ctx) as nethsm: + key_ids = nethsm.list_keys() + + print(f"Keys on NetHSM {nethsm.host}:") + print() + + headers = ["Key ID"] + if details: + headers += ["Algorithm", "Mechanisms", "Operations"] + data = [] + for key_id in key_ids: + key = nethsm.get_key(key_id=key_id.value) + data.append( + [key_id, key.algorithm, ", ".join(key.mechanisms), key.operations] + ) + else: + data = [[key_id] for key_id in key_ids] + + print_table(headers, data) + + +@nethsm.command() +@click.argument("key_id") +@click.option("--public-key", is_flag=True, help="Query the public key as a PEM file") +@click.pass_context +def get_key(ctx, key_id, public_key): + """Get information about a key on the NetHSM. + + This command requires authentication as a user with the Administrator or + Operator role.""" + with connect(ctx) as nethsm: + if public_key: + print(nethsm.get_key_public_key(key_id)) + else: + key = nethsm.get_key(key_id) + mechanisms = ", ".join(key.mechanisms) + print(f"Key {key_id} on NetHSM {nethsm.host}:") + print(f"Algorithm: {key.algorithm}") + print(f"Mechanisms: {mechanisms}") + print(f"Operations: {key.operations}") + print(f"Modulus: {key.modulus}") + print(f"Public exponent: {key.public_exponent}") + + +@nethsm.command() +@click.argument("key-id") +@click.pass_context +def delete_key(ctx, key_id): + """Delete the key pair with the given key ID on the NetHSM. + + This command requires authentication as a user with the Administrator + role.""" + with connect(ctx) as nethsm: + nethsm.delete_key(key_id) + print(f"Key {key_id} deleted on NetHSM {nethsm.host}") + + +def prompt_mechanisms(algorithm): + available_mechanisms = [] + print("Supported mechanisms for this algorithm:") + for mechanism in pynitrokey.nethsm.KeyMechanism: + if mechanism.value.startswith(algorithm): + available_mechanisms.append(mechanism.value) + print(f" {mechanism.value}") + + print("Please enter at least one mechanism. Enter an empty string to " + "finish the list of mechanisms.") + + mechanism_type = click.Choice(available_mechanisms, case_sensitive=False) + mechanisms = [] + cont = True + while cont: + default = None + prompt = "Add mechanism" + if mechanisms: + prompt += " (or empty string to continue)" + default = "" + mechanism = click.prompt( + prompt, + type=mechanism_type, + default=default, + show_choices=False, + show_default=False, + ) + if mechanism: + mechanisms.append(mechanism) + else: + cont = False + + if not mechanisms: + raise click.ClickException("No key mechanisms selected!") + + return mechanisms + + +@nethsm.command() +@click.option( + "-a", + "--algorithm", + type=ALGORITHM_TYPE, + prompt=True, + help="The algorithm for the new key", +) +@click.option( + "-m", + "--mechanism", + "mechanisms", + type=MECHANISM_TYPE, + multiple=True, + help="The mechanisms for the new key", +) +@click.option( + "-p", + "--prime-p", + help="The prime p for RSA keys", +) +@click.option( + "-q", + "--prime-q", + help="The prime q for RSA keys", +) +@click.option( + "-e", + "--public-exponent", + help="The public exponent for RSA keys", +) +@click.option( + "-d", + "--data", + help="The key data for ED25519 or ECDSA_* keys", +) +@click.option( + "-k", + "--key-id", + help="The ID of the new key", +) +@click.pass_context +def add_key(ctx, algorithm, mechanisms, prime_p, prime_q, public_exponent, data, key_id): + """Add a key pair on the NetHSM. + + If the key ID is not set, it is generated by the NetHSM. + + This command requires authentication as a user with the Administrator + role.""" + mechanisms = list(mechanisms) or prompt_mechanisms(algorithm) + + if algorithm == "RSA": + if data: + raise click.ClickException("-d/--data must not be set for RSA keys") + if not prime_p: + prime_p = click.prompt("Prime p") + if not prime_q: + prime_q = click.prompt("Prime q") + if not public_exponent: + public_exponent = click.prompt("Public exponent") + else: + if prime_p: + raise click.ClickException("-p/--prime-p may only be set for RSA keys") + if prime_q: + raise click.ClickException("-q/--prime-q may only be set for RSA keys") + if public_exponent: + raise click.ClickException("-e/--public-exponent may only be set for RSA keys") + if not data: + data = click.prompt("Key data") + + with connect(ctx) as nethsm: + key_id = nethsm.add_key( + key_id=key_id, + algorithm=algorithm, + mechanisms=mechanisms, + prime_p=prime_p, + prime_q=prime_q, + public_exponent=public_exponent, + data=data, + ) + print(f"Key {key_id} added to NetHSM {nethsm.host}") + + +@nethsm.command() +@click.option( + "-a", + "--algorithm", + type=ALGORITHM_TYPE, + prompt=True, + help="The algorithm for the generated key", +) +@click.option( + "-m", + "--mechanism", + "mechanisms", + type=MECHANISM_TYPE, + multiple=True, + help="The mechanisms for the generated key", +) +@click.option( + "-l", + "--length", + type=int, + prompt=True, + help="The length of the generated key", +) +@click.option( + "-k", + "--key-id", + help="The ID of the generated key", +) +@click.pass_context +def generate_key(ctx, algorithm, mechanisms, length, key_id): + """Generate a key pair on the NetHSM. + + This command requires authentication as a user with the Administrator + role.""" + mechanisms = list(mechanisms) or prompt_mechanisms(algorithm) + with connect(ctx) as nethsm: + key_id = nethsm.generate_key(algorithm, mechanisms, length, key_id) + print(f"Key {key_id} generated on NetHSM {nethsm.host}") + + +@nethsm.command() +@click.option("--logging", is_flag=True, help="Query the logging configuration") +@click.option("--network", is_flag=True, help="Query the network configuration") +@click.option("--time", is_flag=True, help="Query the system time") +@click.option( + "--unattended-boot", is_flag=True, help="Query the unattended boot configuration" +) +@click.option("--public-key", is_flag=True, help="Query the public key") +@click.option("--certificate", is_flag=True, help="Query the certificate") +@click.pass_context +def get_config(ctx, **kwargs): + """Query the configuration of a NetHSM. + + Only the configuration items selected with the corresponding option are + queried. If no option is set, all items are queried. + + This command requires authentication as a user with the Administrator + role.""" + with connect(ctx) as nethsm: + print(f"Configuration for NetHSM {nethsm.host}:") + show_all = not any(kwargs.values()) + + if show_all or kwargs["logging"]: + data = nethsm.get_config_logging() + print(" Logging:") + print(" IP address: ", data.ip_address) + print(" Port: ", data.port) + print(" Log level: ", data.log_level) + + if show_all or kwargs["network"]: + data = nethsm.get_config_network() + print(" Network:") + print(" IP address: ", data.ip_address) + print(" Netmask: ", data.netmask) + print(" Gateway: ", data.gateway) + + if show_all or kwargs["time"]: + time = nethsm.get_config_time() + print(" Time: ", time) + + if show_all or kwargs["unattended_boot"]: + unattended_boot = nethsm.get_config_unattended_boot() + print(" Unattended boot:", unattended_boot) + + if show_all or kwargs["public_key"]: + public_key = nethsm.get_public_key() + print(" Public key:") + for line in public_key.splitlines(): + print(f" {line}") + + if show_all or kwargs["certificate"]: + certificate = nethsm.get_certificate() + print(" Certificate:") + for line in certificate.splitlines(): + print(f" {line}") + + +@nethsm.command() +@click.option( + "-p", + "--passphrase", + hide_input=True, + confirmation_prompt=True, + prompt=True, + help="The new backup passphrase", +) +@click.pass_context +def set_backup_passphrase(ctx, passphrase): + """Set the backup passphrase of a NetHSM. + + This command requires authentication as a user with the Administrator + role.""" + with connect(ctx) as nethsm: + nethsm.set_backup_passphrase(passphrase) + print(f"Updated the backup passphrase for NetHSM {nethsm.host}") + + +@nethsm.command() +@click.option( + "-p", + "--passphrase", + hide_input=True, + confirmation_prompt=True, + prompt=True, + help="The new unlock passphrase", +) +@click.pass_context +def set_unlock_passphrase(ctx, passphrase): + """Set the unlock passphrase of a NetHSM. + + This command requires authentication as a user with the Administrator + role.""" + with connect(ctx) as nethsm: + nethsm.set_unlock_passphrase(passphrase) + print(f"Updated the unlock passphrase for NetHSM {nethsm.host}") + + +@nethsm.command() +@click.option( + "-a", + "--ip-address", + help="The IP address of the new logging destination", + required=True, +) +@click.option( + "-p", + "--port", + type=int, + help="The port of the new logging destination", + required=True, +) +@click.option( + "-l", + "--log-level", + type=LOG_LEVEL_TYPE, + help="The new log level", + required=True, +) +@click.pass_context +def set_logging_config(ctx, ip_address, port, log_level): + """Set the logging configuration of a NetHSM. + + This command requires authentication as a user with the Administrator + role.""" + with connect(ctx) as nethsm: + nethsm.set_logging_config(ip_address, port, log_level) + print(f"Updated the logging configuration for NetHSM {nethsm.host}") + + +@nethsm.command() +@click.option( + "-a", + "--ip-address", + help="The new IP address", + required=True, +) +@click.option( + "-n", + "--netmask", + help="The new netmask", + required=True, +) +@click.option( + "-g", + "--gateway", + help="The new gateway", + required=True, +) +@click.pass_context +def set_network_config(ctx, ip_address, netmask, gateway): + """Set the network configuration of a NetHSM. + + This command requires authentication as a user with the Administrator + role.""" + with connect(ctx) as nethsm: + nethsm.set_network_config(ip_address, netmask, gateway) + print(f"Updated the network configuration for NetHSM {nethsm.host}") + + +@nethsm.command() +@click.argument( + "time", + type=DATETIME_TYPE, + required=False, +) +@click.pass_context +def set_time(ctx, time): + """Set the system time of a NetHSM. + + If the time is not given as an argument, the system time of this system is used. + + This command requires authentication as a user with the Administrator + role.""" + if not time: + time = datetime.datetime.now(datetime.timezone.utc) + with connect(ctx) as nethsm: + nethsm.set_time(time) + print(f"Updated the system time for NetHSM {nethsm.host}") + + +@nethsm.command() +@click.argument( + "status", + type=UNATTENDED_BOOT_STATUS_TYPE, +) +@click.pass_context +def set_unattended_boot(ctx, status): + """Set the unattended boot configuration of a NetHSM. + + This command requires authentication as a user with the Administrator + role.""" + with connect(ctx) as nethsm: + nethsm.set_unattended_boot(status) + print(f"Updated the unattended boot configuration for NetHSM {nethsm.host}") + + +@nethsm.command() +@click.pass_context +def system_info(ctx): + """Get system information for a NetHSM instance. + + This command requires authentication as a user with the Administrator + role.""" + with connect(ctx) as nethsm: + info = nethsm.get_system_info() + print(f"Host: {nethsm.host}") + print(f"Firmware version: {info.firmware_version}") + print(f"Software version: {info.software_version}") + print(f"Hardware version: {info.hardware_version}") + print(f"Build tag: {info.build_tag}") + + +@nethsm.command() +@click.pass_context +def reboot(ctx): + """Reboot a NetHSM instance. + + This command requires authentication as a user with the Administrator + role.""" + with connect(ctx) as nethsm: + nethsm.reboot() + print(f"NetHSM {nethsm.host} is about to reboot") + + +@nethsm.command() +@click.pass_context +def shutdown(ctx): + """Shutdown a NetHSM instance. + + This command requires authentication as a user with the Administrator + role.""" + with connect(ctx) as nethsm: + nethsm.shutdown() + print(f"NetHSM {nethsm.host} is about to shutdown") + + +@nethsm.command() +@click.pass_context +def reset(ctx): + """Reset a NetHSM instance. + + This command requires authentication as a user with the Administrator + role.""" + with connect(ctx) as nethsm: + nethsm.reset() + print(f"NetHSM {nethsm.host} is about to reset") + + +@nethsm.command() +@click.option( + "-k", + "--key-id", + prompt=True, + help="The ID of the key to decrypt the data width", +) +@click.option( + "-d", + "--data", + prompt=True, + help="The encrypted data in Base64 encoding", +) +@click.option( + "-m", + "--mode", + type=DECRYPT_MODE_TYPE, + prompt=True, + help="The decrypt mode", +) +@click.pass_context +def decrypt(ctx, key_id, data, mode): + """Decrypt data with a secret key on the NetHSM and print the decrypted message. + + This command requires authentication as a user with the Operator role.""" + with connect(ctx) as nethsm: + print(nethsm.decrypt(key_id, data, mode)) + + +@nethsm.command() +@click.option( + "-k", + "--key-id", + prompt=True, + help="The ID of the key to sign the data width", +) +@click.option( + "-d", + "--data", + prompt=True, + help="The data to sign encoded using Base64", +) +@click.option( + "-m", + "--mode", + type=SIGN_MODE_TYPE, + prompt=True, + help="The sign mode", +) +@click.pass_context +def sign(ctx, key_id, data, mode): + """Sign data with a secret key on the NetHSM and print the signature. + + This command requires authentication as a user with the Operator role.""" + with connect(ctx) as nethsm: + signature = nethsm.sign(key_id, data, mode) + print(signature) diff --git a/pynitrokey/nethsm/__init__.py b/pynitrokey/nethsm/__init__.py index c229ad10..a1f3b58e 100644 --- a/pynitrokey/nethsm/__init__.py +++ b/pynitrokey/nethsm/__init__.py @@ -9,6 +9,8 @@ import contextlib import enum +import json +import re from . import client from .client import ApiException @@ -38,6 +40,106 @@ class State(enum.Enum): LOCKED = "Locked" OPERATIONAL = "Operational" + @staticmethod + def from_model(model_state): + return State.from_string(model_state.value) + + @staticmethod + def from_string(s): + for state in State: + if state.value == s: + return state + raise ValueError(f"Unsupported system state {s}") + + +class LogLevel(enum.Enum): + DEBUG = "debug" + INFO = "info" + WARNING = "warning" + ERROR = "error" + + @staticmethod + def from_model(model_log_level): + return LogLevel.from_string(model_log_level.value) + + @staticmethod + def from_string(s): + for log_level in LogLevel: + if log_level.value == s: + return log_level + raise ValueError(f"Unsupported log level {s}") + + +class UnattendedBootStatus(enum.Enum): + ON = "on" + OFF = "off" + + +class KeyAlgorithm(enum.Enum): + RSA = "RSA" + ED25519 = "ED25519" + ECDSA_P224 = "ECDSA_P224" + ECDSA_P256 = "ECDSA_P256" + ECDSA_P384 = "ECDSA_P384" + ECDSA_P521 = "ECDSA_P521" + + +class KeyMechanism(enum.Enum): + RSA_DECRYPTION_RAW = "RSA_Decryption_RAW" + RSA_DECRYPTION_PKCS1 = "RSA_Decryption_PKCS1" + RSA_DECRYPTION_OAEP_MD5 = "RSA_Decryption_OAEP_MD5" + RSA_DECRYPTION_OAEP_SHA1 = "RSA_Decryption_OAEP_SHA1" + RSA_DECRYPTION_OAEP_SHA224 = "RSA_Decryption_OAEP_SHA224" + RSA_DECRYPTION_OAEP_SHA256 = "RSA_Decryption_OAEP_SHA256" + RSA_DECRYPTION_OAEP_SHA384 = "RSA_Decryption_OAEP_SHA384" + RSA_DECRYPTION_OAEP_SHA512 = "RSA_Decryption_OAEP_SHA512" + RSA_SIGNATURE_PKCS1 = "RSA_Signature_PKCS1" + RSA_SIGNATURE_PSS_MD5 = "RSA_Signature_PSS_MD5" + RSA_SIGNATURE_PSS_SHA1 = "RSA_Signature_PSS_SHA1" + RSA_SIGNATURE_PSS_SHA224 = "RSA_Signature_PSS_SHA224" + RSA_SIGNATURE_PSS_SHA256 = "RSA_Signature_PSS_SHA256" + RSA_SIGNATURE_PSS_SHA384 = "RSA_Signature_PSS_SHA384" + RSA_SIGNATURE_PSS_SHA512 = "RSA_Signature_PSS_SHA512" + ED25519_SIGNATURE = "ED25519_Signature" + ECDSA_P224_SIGNATURE = "ECDSA_P224_Signature" + ECDSA_P256_SIGNATURE = "ECDSA_P256_Signature" + ECDSA_P384_SIGNATURE = "ECDSA_P384_Signature" + ECDSA_P521_SIGNATURE = "ECDSA_P521_Signature" + + +class DecryptMode(enum.Enum): + RAW = "RAW" + PKCS1 = "PKCS1" + OAEP_MD5 = "OAEP_MD5" + OAEP_SHA1 = "OAEP_SHA1" + OAEP_SHA224 = "OAEP_SHA224" + OAEP_SHA256 = "OAEP_SHA256" + OAEP_SHA384 = "OAEP_SHA384" + OAEP_SHA512 = "OAEP_SHA512" + + +class SignMode(enum.Enum): + PKCS1 = "PKCS1" + PSS_MD5 = "PSS_MD5" + PSS_SHA1 = "PSS_SHA1" + PSS_SHA224 = "PSS_SHA224" + PSS_SHA256 = "PSS_SHA256" + PSS_SHA384 = "PSS_SHA384" + PSS_SHA512 = "PSS_SHA512" + ED25519 = "ED25519" + ECDSA_P224 = "ECDSA_P224" + ECDSA_P256 = "ECDSA_P256" + ECDSA_P384 = "ECDSA_P384" + ECDSA_P521 = "ECDSA_P521" + + +class SystemInfo: + def __init__(self, firmware_version, software_version, hardware_version, build_tag): + self.firmware_version = firmware_version + self.software_version = software_version + self.hardware_version = hardware_version + self.build_tag = build_tag + class User: def __init__(self, user_id, real_name, role): @@ -46,19 +148,39 @@ def __init__(self, user_id, real_name, role): self.role = role +class Key: + def __init__(self, key_id, mechanisms, algorithm, operations, modulus, public_exponent): + self.key_id = key_id + self.mechanisms = mechanisms + self.algorithm = algorithm + self.operations = operations + self.modulus = modulus + self.public_exponent = public_exponent + + def _handle_api_exception(e, messages={}, roles=[], state=None): - if e.status == 403 and roles: + if e.status in messages: + message = messages[e.status] + elif e.status == 403 and roles: roles = [role.value for role in roles] message = "Access denied -- this operation requires the role " + " or ".join( roles ) + elif e.status == 401 and roles: + message = "Unauthorized -- invalid username or password" elif e.status == 412 and state: message = f"Precondition failed -- this operation can only be used on a NetHSM in the state {state.value}" - elif e.status in messages: - message = messages[e.status] else: message = f"Unexpected API error {e.status}: {e.reason}" + if e.body: + try: + body = json.loads(e.body) + if "message" in body: + message += "\n" + body["message"] + except json.JSONDecodeError: + pass + raise NetHSMError(message) @@ -68,7 +190,7 @@ def __init__(self, message): class NetHSM: - def __init__(self, host, version, username, password): + def __init__(self, host, version, username, password, verify_tls=True): self.host = host self.version = version self.username = username @@ -78,6 +200,7 @@ def __init__(self, host, version, username, password): config = client.Configuration( host=base_url, username=username, password=password ) + config.verify_ssl = verify_tls self.client = client.ApiClient(configuration=config) def close(self): @@ -88,6 +211,23 @@ def get_api(self): return DefaultApi(self.client) + def get_location(self): + return self.client.last_response.getheaders().get('location', '') + + def get_key_id_from_location(self): + location = self.get_location() + key_id_match = re.fullmatch(f"/api/{self.version}/keys/(.*)", location) + if not key_id_match: + raise click.ClickException("Could not determine the ID of the new key") + return key_id_match[1] + + def get_user_id_from_location(self): + location = self.get_location() + user_id_match = re.fullmatch(f"/api/{self.version}/users/(.*)", location) + if not user_id_match: + raise click.ClickException("Could not determine the ID of the new user") + return user_id_match[1] + def unlock(self, passphrase): from .client.model.unlock_request_data import UnlockRequestData @@ -107,12 +247,10 @@ def lock(self): try: self.get_api().lock_post() except ApiException as e: - # TODO: API docs say 403, but demo server gives 401, see nethsm issue #99 _handle_api_exception( e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR], - messages={401: "Access denied"}, ) def provision(self, unlock_passphrase, admin_passphrase, system_time): @@ -143,9 +281,6 @@ def list_users(self): e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR], - messages={ - 401: "Invalid user name and/or password", - }, ) def get_user(self, user_id): @@ -161,7 +296,7 @@ def get_user(self, user_id): e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR, Role.OPERATOR], - message={ + messages={ 404: f"User {user_id} not found", }, ) @@ -181,8 +316,7 @@ def add_user(self, real_name, role, passphrase, user_id=None): return user_id else: self.get_api().users_post(body=body) - # TODO: determine the user ID generated by the NetHSM - return "[randomly generated user ID]" + return self.get_user_id_from_location() except ApiException as e: _handle_api_exception( e, @@ -224,10 +358,394 @@ def set_passphrase(self, user_id, passphrase): }, ) + def get_info(self): + try: + data = self.get_api().info_get() + return (data.vendor, data.product) + except ApiException as e: + _handle_api_exception(e) + + def get_state(self): + try: + data = self.get_api().health_state_get() + return State.from_model(data.state) + except ApiException as e: + _handle_api_exception(e) + + def get_random_data(self, n): + from .client.model.random_request_data import RandomRequestData + + body = RandomRequestData(length=n) + try: + data = self.get_api().random_post(body=body) + return data.random + except ApiException as e: + _handle_api_exception(e, state=State.OPERATIONAL, roles=[Role.OPERATOR]) + + def get_metrics(self): + try: + return self.get_api().metrics_get() + except ApiException as e: + _handle_api_exception(e, state=State.OPERATIONAL, roles=[Role.METRICS]) + + def list_keys(self): + try: + data = self.get_api().keys_get() + return [item["key"] for item in data.value] + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR, Role.OPERATOR], + ) + + def get_key(self, key_id): + try: + key = self.get_api().keys_key_id_get(key_id=key_id) + return Key( + key_id=key_id, + mechanisms=[mechanism.value for mechanism in key.mechanisms.value], + algorithm=key.algorithm.value, + operations=key.operations, + modulus=key.key.modulus, + public_exponent=key.key.public_exponent, + ) + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR, Role.OPERATOR], + messages={ + 404: f"Key {key_id} not found", + }, + ) + + def get_key_public_key(self, key_id): + try: + return self.get_api().keys_key_id_public_pem_get(key_id=key_id) + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR, Role.OPERATOR], + messages={ + 404: f"Key {key_id} not found", + }, + ) + + def add_key(self, key_id, algorithm, mechanisms, prime_p, prime_q, public_exponent, data): + from .client.model.key_algorithm import KeyAlgorithm + from .client.model.key_mechanism import KeyMechanism + from .client.model.key_mechanisms import KeyMechanisms + from .client.model.key_private_data import KeyPrivateData + from .client.model.private_key import PrivateKey + + if algorithm == "RSA": + key_data = KeyPrivateData( + prime_p=prime_p, + prime_q=prime_q, + public_exponent=public_exponent, + ) + else: + key_data = KeyPrivateData(data=data) + + body = PrivateKey( + algorithm=KeyAlgorithm(algorithm), + mechanisms=KeyMechanisms([KeyMechanism(mechanism) for mechanism in mechanisms]), + key=key_data, + ) + try: + if key_id: + self.get_api().keys_key_id_put(key_id=key_id, body=body) + return key_id + else: + self.get_api().keys_post(body=body) + return self.get_key_id_from_location() + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + messages={ + 400: "Bad request -- specified properties are invalid", + 409: f"Conflict -- a key with the ID {key_id} already exists", + }, + ) + + def delete_key(self, key_id): + try: + self.get_api().keys_key_id_delete(key_id=key_id) + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + messages={ + 404: f"Key {key_id} not found", + }, + ) + + def generate_key(self, algorithm, mechanisms, length, key_id): + from .client.model.key_algorithm import KeyAlgorithm + from .client.model.key_mechanism import KeyMechanism + from .client.model.key_mechanisms import KeyMechanisms + from .client.model.key_generate_request_data import KeyGenerateRequestData + + body = KeyGenerateRequestData( + algorithm=KeyAlgorithm(algorithm), + mechanisms=KeyMechanisms([KeyMechanism(mechanism) for mechanism in mechanisms]), + length=length, + id=key_id or "", + ) + try: + self.get_api().keys_generate_post(body=body) + return key_id or self.get_key_id_from_location() + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + messages={ + 400: "Bad request -- invalid input data", + }, + ) + + def get_config_logging(self): + try: + return self.get_api().config_logging_get() + except ApiException as e: + _handle_api_exception( + e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR] + ) + + def get_config_network(self): + try: + return self.get_api().config_network_get() + except ApiException as e: + _handle_api_exception( + e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR] + ) + + def get_config_time(self): + try: + return self.get_api().config_time_get().time + except ApiException as e: + _handle_api_exception( + e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR] + ) + + def get_config_unattended_boot(self): + try: + return self.get_api().config_unattended_boot_get().status + except ApiException as e: + _handle_api_exception( + e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR] + ) + + def get_public_key(self): + try: + return self.get_api().config_tls_public_pem_get() + except ApiException as e: + _handle_api_exception( + e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR] + ) + + def get_certificate(self): + try: + return self.get_api().config_tls_cert_pem_get() + except ApiException as e: + _handle_api_exception( + e, state=State.OPERATIONAL, roles=[Role.ADMINISTRATOR] + ) + + def set_backup_passphrase(self, passphrase): + from .client.model.backup_passphrase_config import BackupPassphraseConfig + + body = BackupPassphraseConfig(passphrase=Passphrase(passphrase)) + try: + self.get_api().config_backup_passphrase_put(body=body) + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + messages={ + 400: "Bad request -- e. g. weak passphrase", + }, + ) + + def set_unlock_passphrase(self, passphrase): + from .client.model.unlock_passphrase_config import UnlockPassphraseConfig + + body = UnlockPassphraseConfig(passphrase=Passphrase(passphrase)) + try: + self.get_api().config_unlock_passphrase_put(body=body) + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + messages={ + 400: "Bad request -- e. g. weak passphrase", + }, + ) + + def set_logging_config(self, ip_address, port, log_level): + from .client.model.log_level import LogLevel + from .client.model.logging_config import LoggingConfig + + body = LoggingConfig(ip_address=ip_address, port=port, log_level=LogLevel(log_level)) + try: + self.get_api().config_logging_put(body=body) + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + messages={ + 400: "Bad request -- invalid input data", + }, + ) + + def set_network_config(self, ip_address, netmask, gateway): + from .client.model.network_config import NetworkConfig + + body = NetworkConfig(ip_address=ip_address, netmask=netmask, gateway=gateway) + try: + self.get_api().config_network_put(body=body) + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + messages={ + 400: "Bad request -- invalid input data", + }, + ) + + def set_time(self, time): + from .client.model.time_config import TimeConfig + + body = TimeConfig(time=time) + try: + self.get_api().config_time_put(body=body) + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + messages={ + 400: "Bad request -- invalid input data", + }, + ) + + def set_unattended_boot(self, status): + from .client.model.switch import Switch + from .client.model.unattended_boot_config import UnattendedBootConfig + + body = UnattendedBootConfig(status=Switch(status)) + try: + self.get_api().config_unattended_boot_put(body=body) + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + messages={ + 400: "Bad request -- invalid status setting", + }, + ) + + def get_system_info(self): + try: + data = self.get_api().system_info_get() + return SystemInfo( + firmware_version=data.firmware_version, + software_version=data.software_version, + hardware_version=data.hardware_version, + build_tag=data.build_tag, + ) + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + ) + + def reboot(self): + try: + self.get_api().system_reboot_post() + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + ) + + def shutdown(self): + try: + self.get_api().system_shutdown_post() + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + ) + + def reset(self): + try: + self.get_api().system_reset_post() + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.ADMINISTRATOR], + ) + + def decrypt(self, key_id, data, mode): + from .client.model.base64 import Base64 + from .client.model.decrypt_mode import DecryptMode + from .client.model.decrypt_request_data import DecryptRequestData + + body = DecryptRequestData(encrypted=Base64(data), mode=DecryptMode(mode)) + try: + data = self.get_api().keys_key_id_decrypt_post(key_id=key_id, body=body) + return data.decrypted.value + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.OPERATOR], + messages={ + 400: "Bad request -- e. g. invalid encryption mode", + 404: f"Key {key_id} not found", + }, + ) + + def sign(self, key_id, data, mode): + from .client.model.base64 import Base64 + from .client.model.sign_mode import SignMode + from .client.model.sign_request_data import SignRequestData + + body = SignRequestData(message=Base64(data), mode=SignMode(mode)) + try: + data = self.get_api().keys_key_id_sign_post(key_id=key_id, body=body) + return data.signature.value + except ApiException as e: + _handle_api_exception( + e, + state=State.OPERATIONAL, + roles=[Role.OPERATOR], + messages={ + 400: "Bad request -- e. g. invalid sign mode", + 404: f"Key {key_id} not found", + }, + ) + @contextlib.contextmanager -def connect(host, version, username, password): - nethsm = NetHSM(host, version, username, password) +def connect(host, version, username, password, verify_tls=True): + nethsm = NetHSM(host, version, username, password, verify_tls) try: yield nethsm finally: diff --git a/pynitrokey/nethsm/client/api/default_api.py b/pynitrokey/nethsm/client/api/default_api.py index d7b449cc..71ba530d 100644 --- a/pynitrokey/nethsm/client/api/default_api.py +++ b/pynitrokey/nethsm/client/api/default_api.py @@ -3198,7 +3198,7 @@ def __keys_key_id_public_pem_get( }, headers_map={ 'accept': [ - 'application/json' + 'application/x-pem-file' ], 'content_type': [], }, diff --git a/pynitrokey/nethsm/client/model/log_level.py b/pynitrokey/nethsm/client/model/log_level.py index 734186df..3a91bc61 100644 --- a/pynitrokey/nethsm/client/model/log_level.py +++ b/pynitrokey/nethsm/client/model/log_level.py @@ -51,7 +51,7 @@ class LogLevel(ModelSimple): ('value',): { 'DEBUG': "debug", 'INFO': "info", - 'WARN': "warn", + 'WARNING': "warning", 'ERROR': "error", }, } @@ -102,10 +102,10 @@ def __init__(self, *args, **kwargs): Note that value can be passed either in args or in kwargs, but not in both. Args: - args[0] (str):, must be one of ["debug", "info", "warn", "error", ] # noqa: E501 + args[0] (str):, must be one of ["debug", "info", "warning", "error", ] # noqa: E501 Keyword Args: - value (str):, must be one of ["debug", "info", "warn", "error", ] # noqa: E501 + value (str):, must be one of ["debug", "info", "warning", "error", ] # noqa: E501 _check_type (bool): if True, values for parameters in openapi_types will be type checked and a TypeError will be raised if the wrong type is input.