Skip to content

Commit

Permalink
removed python related changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ghufz committed Apr 25, 2024
1 parent 1983e93 commit 14c3fad
Showing 1 changed file with 37 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ class HttpSigningConfiguration:
a private key. The private key is used to sign HTTP requests.
:param private_key_passphrase: A string value specifying the passphrase to decrypt
the private key.
:param private_key_string: Specify the API key as string.
:param signed_headers: A list of strings. Each value is the name of a HTTP header
that must be included in the HTTP signature calculation.
The two special signature headers '(request-target)' and '(created)' SHOULD be
Expand Down Expand Up @@ -108,8 +107,7 @@ class HttpSigningConfiguration:
:param signature_max_validity: The signature max validity, expressed as
a datetime.timedelta value. It must be a positive value.
"""
def __init__(self, key_id, signing_scheme, private_key_path=None,
private_key_string =None
def __init__(self, key_id, signing_scheme, private_key_path,
private_key_passphrase=None,
signed_headers=None,
signing_algorithm=None,
Expand All @@ -119,16 +117,8 @@ class HttpSigningConfiguration:
if signing_scheme not in {SCHEME_HS2019, SCHEME_RSA_SHA256, SCHEME_RSA_SHA512}:
raise Exception("Unsupported security scheme: {0}".format(signing_scheme))
self.signing_scheme = signing_scheme

is_private_key_path_exists = False
if private_key_path is not None and os.path.exists(private_key_path):
is_private_key_path_exists =True
if not is_private_key_path_exists and private_key_string is None:
raise Exception("Private key file orprivate key string not provided.")

self.private_key_path = private_key_path
self.private_key_string = private_key_string

if not os.path.exists(private_key_path):
raise Exception("Private key file does not exist")
self.private_key_path = private_key_path
self.private_key_passphrase = private_key_passphrase
self.signing_algorithm = signing_algorithm
Expand Down Expand Up @@ -220,52 +210,44 @@ class HttpSigningConfiguration:
The private key is used to sign HTTP requests as defined in
https://datatracker.ietf.org/doc/draft-cavage-http-signatures/.
"""
pem_data = ""
if self.private_key_string is not None:
pem_data = self.private_key_string
elif os.path.exists(self.private_key_path):
with open(self.private_key_path,'r')as f:
pem_data = f.read()
else:
raise Exception("API Key either in file or as string is not provided.")

if self.private_key is not None:
return

# Verify PEM Pre-Encapsulation Boundary
r = re.compile(r"\s*-----BEGIN (.*)-----\s+")
m = r.match(pem_data)
if not m:
raise ValueError("Not a valid PEM pre boundary")
pem_header = m.group(1)
if pem_header == 'RSA PRIVATE KEY':
self.private_key = RSA.importKey(pem_data, self.private_key_passphrase)
elif pem_header == 'EC PRIVATE KEY':
self.private_key = ECC.import_key(pem_data, self.private_key_passphrase)
elif pem_header in {'PRIVATE KEY', 'ENCRYPTED PRIVATE KEY'}:
# Key is in PKCS8 format, which is capable of holding many different
# types of private keys, not just EC keys.
(key_binary, pem_header, is_encrypted) = \
PEM.decode(pem_data, self.private_key_passphrase)
(oid, privkey, params) = \
PKCS8.unwrap(key_binary, passphrase=self.private_key_passphrase)
if oid == '1.2.840.10045.2.1':
with open(self.private_key_path, 'r') as f:
pem_data = f.read()
# Verify PEM Pre-Encapsulation Boundary
r = re.compile(r"\s*-----BEGIN (.*)-----\s+")
m = r.match(pem_data)
if not m:
raise ValueError("Not a valid PEM pre boundary")
pem_header = m.group(1)
if pem_header == 'RSA PRIVATE KEY':
self.private_key = RSA.importKey(pem_data, self.private_key_passphrase)
elif pem_header == 'EC PRIVATE KEY':
self.private_key = ECC.import_key(pem_data, self.private_key_passphrase)
elif pem_header in {'PRIVATE KEY', 'ENCRYPTED PRIVATE KEY'}:
# Key is in PKCS8 format, which is capable of holding many different
# types of private keys, not just EC keys.
(key_binary, pem_header, is_encrypted) = \
PEM.decode(pem_data, self.private_key_passphrase)
(oid, privkey, params) = \
PKCS8.unwrap(key_binary, passphrase=self.private_key_passphrase)
if oid == '1.2.840.10045.2.1':
self.private_key = ECC.import_key(pem_data, self.private_key_passphrase)
else:
raise Exception("Unsupported key: {0}. OID: {1}".format(pem_header, oid))
else:
raise Exception("Unsupported key: {0}. OID: {1}".format(pem_header, oid))
else:
raise Exception("Unsupported key: {0}".format(pem_header))
# Validate the specified signature algorithm is compatible with the private key.
if self.signing_algorithm is not None:
supported_algs = None
if isinstance(self.private_key, RSA.RsaKey):
supported_algs = {ALGORITHM_RSASSA_PSS, ALGORITHM_RSASSA_PKCS1v15}
elif isinstance(self.private_key, ECC.EccKey):
supported_algs = ALGORITHM_ECDSA_KEY_SIGNING_ALGORITHMS
if supported_algs is not None and self.signing_algorithm not in supported_algs:
raise Exception(
"Signing algorithm {0} is not compatible with private key".format(
self.signing_algorithm))
raise Exception("Unsupported key: {0}".format(pem_header))
# Validate the specified signature algorithm is compatible with the private key.
if self.signing_algorithm is not None:
supported_algs = None
if isinstance(self.private_key, RSA.RsaKey):
supported_algs = {ALGORITHM_RSASSA_PSS, ALGORITHM_RSASSA_PKCS1v15}
elif isinstance(self.private_key, ECC.EccKey):
supported_algs = ALGORITHM_ECDSA_KEY_SIGNING_ALGORITHMS
if supported_algs is not None and self.signing_algorithm not in supported_algs:
raise Exception(
"Signing algorithm {0} is not compatible with private key".format(
self.signing_algorithm))

def _get_signed_header_info(self, resource_path, method, headers, body, query_params):
"""Build the HTTP headers (name, value) that need to be included in
Expand Down

0 comments on commit 14c3fad

Please sign in to comment.