Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pyupgrade "UP" linting rules integration, code fixes to comply #836

Merged
merged 4 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ lint.select = [
"F", # ruff default
"I", # isort: all
"PL", # pylint: all
"UP", # pyupgrade: all
"S", # flake8-bandit: all
"N", # pep8-naming: all
"RUF100" # ruff: find unused noqa
Expand Down
77 changes: 34 additions & 43 deletions securesystemslib/_gpg/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ def parse_pubkey_payload(data):
ptr += 1
if version_number not in SUPPORTED_PUBKEY_PACKET_VERSIONS:
raise PacketVersionNotSupportedError(
"Pubkey packet version '{}' not supported, must be one of {}".format(
version_number, SUPPORTED_PUBKEY_PACKET_VERSIONS
)
f"Pubkey packet version '{version_number}' not supported, "
f"must be one of {SUPPORTED_PUBKEY_PACKET_VERSIONS}"
)

# NOTE: Uncomment this line to decode the time of creation
Expand All @@ -130,10 +129,10 @@ def parse_pubkey_payload(data):
# as described in section 5.2.3.21.
if algorithm not in SUPPORTED_SIGNATURE_ALGORITHMS:
raise SignatureAlgorithmNotSupportedError(
"Signature algorithm '{}' not "
f"Signature algorithm '{algorithm}' not "
"supported, please verify that your gpg configuration is creating "
"either DSA, RSA, or EdDSA signatures (see RFC4880 9.1. Public-Key "
"Algorithms).".format(algorithm)
"Algorithms)."
)

keyinfo["type"] = SUPPORTED_SIGNATURE_ALGORITHMS[algorithm]["type"]
Expand Down Expand Up @@ -216,8 +215,8 @@ def parse_pubkey_bundle(data):
and not key_bundle[PACKET_TYPE_PRIMARY_KEY]["key"]
):
raise PacketParsingError(
"First packet must be a primary key ('{}'), "
"got '{}'.".format(PACKET_TYPE_PRIMARY_KEY, packet_type)
"First packet must be a primary key "
f"('{PACKET_TYPE_PRIMARY_KEY}'), got '{packet_type}'."
)

if (
Expand Down Expand Up @@ -281,26 +280,23 @@ def parse_pubkey_bundle(data):
)

else:
packets_list = [
PACKET_TYPE_PRIMARY_KEY,
PACKET_TYPE_USER_ID,
PACKET_TYPE_USER_ATTR,
PACKET_TYPE_SUB_KEY,
PACKET_TYPE_SIGNATURE,
]
log.info(
"Ignoring gpg key packet '{}', we only handle packets of "
"types '{}' (see RFC4880 4.3. Packet Tags).".format(
packet_type,
[
PACKET_TYPE_PRIMARY_KEY,
PACKET_TYPE_USER_ID,
PACKET_TYPE_USER_ATTR,
PACKET_TYPE_SUB_KEY,
PACKET_TYPE_SIGNATURE,
],
)
f"Ignoring gpg key packet '{packet_type}', "
"we only handle packets of "
f"types '{packets_list}' (see RFC4880 4.3. Packet Tags)."
)

# Both errors might be raised in parse_packet_header and in this loop
except (PacketParsingError, IndexError) as e:
raise PacketParsingError(
"Invalid public key data at position {}: {}.".format(
position, e
)
f"Invalid public key data at position {position}: {e}."
)

# Go to next packet
Expand Down Expand Up @@ -610,9 +606,7 @@ def get_pubkey_bundle(data, keyid):
"""
if not data:
raise KeyNotFoundError(
"Could not find gpg key '{}' in empty exported key " "data.".format(
keyid
)
f"Could not find gpg key '{keyid}' in empty exported key data."
)

# Parse out master key and subkeys (enriched and verified via certificates
Expand Down Expand Up @@ -643,7 +637,7 @@ def get_pubkey_bundle(data, keyid):

else:
raise KeyNotFoundError(
"Could not find gpg key '{}' in exported key data.".format(keyid)
f"Could not find gpg key '{keyid}' in exported key data."
)

# Add subkeys dictionary to master pubkey "subkeys" field if subkeys exist
Expand Down Expand Up @@ -724,8 +718,8 @@ def parse_signature_packet(
ptr += 1
if version_number not in SUPPORTED_SIGNATURE_PACKET_VERSIONS:
raise ValueError(
"Signature version '{}' not supported, must be one of "
"{}.".format(version_number, SUPPORTED_SIGNATURE_PACKET_VERSIONS)
f"Signature version '{version_number}' not supported, "
f"must be one of {SUPPORTED_SIGNATURE_PACKET_VERSIONS}."
)

# Per default we only parse "signatures of a binary document". Other types
Expand All @@ -737,21 +731,20 @@ def parse_signature_packet(

if signature_type not in supported_signature_types:
raise ValueError(
"Signature type '{}' not supported, must be one of {} "
"(see RFC4880 5.2.1. Signature Types).".format(
signature_type, supported_signature_types
)
f"Signature type '{signature_type}' not supported, "
f"must be one of {supported_signature_types} "
"(see RFC4880 5.2.1. Signature Types)."
)

signature_algorithm = data[ptr]
ptr += 1

if signature_algorithm not in SUPPORTED_SIGNATURE_ALGORITHMS:
raise ValueError(
"Signature algorithm '{}' not "
f"Signature algorithm '{signature_algorithm}' not "
"supported, please verify that your gpg configuration is creating "
"either DSA, RSA, or EdDSA signatures (see RFC4880 9.1. Public-Key "
"Algorithms).".format(signature_algorithm)
"Algorithms)."
)

key_type = SUPPORTED_SIGNATURE_ALGORITHMS[signature_algorithm]["type"]
Expand All @@ -762,10 +755,9 @@ def parse_signature_packet(

if hash_algorithm not in supported_hash_algorithms:
raise ValueError(
"Hash algorithm '{}' not supported, must be one of {}"
" (see RFC4880 9.4. Hash Algorithms).".format(
hash_algorithm, supported_hash_algorithms
)
f"Hash algorithm '{hash_algorithm}' not supported, "
f"must be one of {supported_hash_algorithms} "
"(see RFC4880 9.4. Hash Algorithms)."
)

# Obtain the hashed octets
Expand Down Expand Up @@ -863,11 +855,10 @@ def parse_signature_packet(
if keyid and not keyid.endswith(short_keyid): # pragma: no cover
raise ValueError(
"This signature packet seems to be corrupted. The key ID "
"'{}' of the 'Issuer' subpacket must match the lower 64 bits of the "
"fingerprint '{}' of the 'Issuer Fingerprint' subpacket (see RFC4880 "
"and rfc4880bis-06 5.2.3.28. Issuer Fingerprint).".format(
short_keyid, keyid
)
f"'{short_keyid}' of the 'Issuer' subpacket must match the "
f"lower 64 bits of the fingerprint '{keyid}' of the 'Issuer "
"Fingerprint' subpacket (see RFC4880 and rfc4880bis-06 5.2.3.28. "
"Issuer Fingerprint)."
)

if not info["creation_time"]: # pragma: no cover
Expand All @@ -886,7 +877,7 @@ def parse_signature_packet(
signature = handler.get_signature_params(data[ptr:])

signature_data = {
"keyid": "{}".format(keyid),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this could be just "keyid": keyid, but I I think as a pure conversion commit this is fine too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this line makes no sense like this, will change to "keyid": keyid

"keyid": f"{keyid}",
"other_headers": binascii.hexlify(data[:other_headers_ptr]).decode(
"ascii"
),
Expand Down
15 changes: 6 additions & 9 deletions securesystemslib/_gpg/eddsa.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ def get_pubkey_params(data):
# See 9.2. ECC Curve OID
if curve_oid != ED25519_PUBLIC_KEY_OID:
raise PacketParsingError(
"bad ed25519 curve OID '{}', expected {}'".format(
curve_oid, ED25519_PUBLIC_KEY_OID
)
f"bad ed25519 curve OID '{curve_oid}', "
f"expected {ED25519_PUBLIC_KEY_OID}'"
)

# See 13.3. EdDSA Point Format
Expand All @@ -90,19 +89,17 @@ def get_pubkey_params(data):

if public_key_len != ED25519_PUBLIC_KEY_LENGTH:
raise PacketParsingError(
"bad ed25519 MPI length '{}', expected {}'".format(
public_key_len, ED25519_PUBLIC_KEY_LENGTH
)
f"bad ed25519 MPI length '{public_key_len}', "
f"expected {ED25519_PUBLIC_KEY_LENGTH}'"
)

public_key_prefix = data[ptr]
ptr += 1

if public_key_prefix != ED25519_PUBLIC_KEY_PREFIX:
raise PacketParsingError(
"bad ed25519 MPI prefix '{}', expected '{}'".format(
public_key_prefix, ED25519_PUBLIC_KEY_PREFIX
)
f"bad ed25519 MPI prefix '{public_key_prefix}', "
f"expected '{ED25519_PUBLIC_KEY_PREFIX}'"
)

public_key = data[ptr : ptr + public_key_len - 1]
Expand Down
2 changes: 1 addition & 1 deletion securesystemslib/_gpg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SignatureAlgorithmNotSupportedError(Exception):

class KeyExpirationError(Exception):
def __init__(self, key):
super(KeyExpirationError, self).__init__()
super().__init__()
self.key = key

def __str__(self):
Expand Down
24 changes: 10 additions & 14 deletions securesystemslib/_gpg/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ def create_signature(content, keyid=None, homedir=None, timeout=GPG_TIMEOUT):

keyarg = ""
if keyid:
keyarg = "--local-user {}".format(keyid)
keyarg = f"--local-user {keyid}"

homearg = ""
if homedir:
homearg = "--homedir {}".format(homedir).replace("\\", "/")
homearg = f"--homedir {homedir}".replace("\\", "/")

command = gpg_sign_command(keyarg=keyarg, homearg=homearg)

Expand All @@ -125,12 +125,9 @@ def create_signature(content, keyid=None, homedir=None, timeout=GPG_TIMEOUT):
# https://lists.gnupg.org/pipermail/gnupg-devel/2005-December/022559.html
if gpg_process.returncode != 0:
raise OSError(
"Command '{}' returned "
"non-zero exit status '{}', stderr was:\n{}.".format(
gpg_process.args,
gpg_process.returncode,
gpg_process.stderr.decode(),
)
f"Command '{gpg_process.args}' returned "
f"non-zero exit status '{gpg_process.returncode}', "
f"stderr was:\n{gpg_process.stderr.decode()}."
)

signature_data = gpg_process.stdout
Expand All @@ -146,9 +143,10 @@ def create_signature(content, keyid=None, homedir=None, timeout=GPG_TIMEOUT):
if not signature["keyid"]: # pragma: no cover
log.warning(
"The created signature does not include the hashed subpacket"
" '33' (full keyid). You probably have a gpg version <{}."
" '33' (full keyid). You probably have a gpg version"
f" <{FULLY_SUPPORTED_MIN_VERSION}."
" We will export the public keys associated with the short keyid to"
" compute the full keyid.".format(FULLY_SUPPORTED_MIN_VERSION)
" compute the full keyid."
)

short_keyid = signature["short_keyid"]
Expand All @@ -173,9 +171,7 @@ def create_signature(content, keyid=None, homedir=None, timeout=GPG_TIMEOUT):
# If there is still no full keyid something went wrong
if not signature["keyid"]: # pragma: no cover
raise ValueError(
"Full keyid could not be determined for signature '{}'".format(
signature
)
f"Full keyid could not be determined for signature '{signature}'"
)

# It is okay now to remove the optional short keyid to save space
Expand Down Expand Up @@ -275,7 +271,7 @@ def export_pubkey(keyid, homedir=None, timeout=GPG_TIMEOUT):

homearg = ""
if homedir:
homearg = "--homedir {}".format(homedir).replace("\\", "/")
homearg = f"--homedir {homedir}".replace("\\", "/")

# TODO: Consider adopting command error handling from `create_signature`
# above, e.g. in a common 'run gpg command' utility function
Expand Down
11 changes: 4 additions & 7 deletions securesystemslib/_gpg/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,7 @@ def parse_packet_header(data, expected_type=None): # noqa: PLR0912

if expected_type is not None and packet_type != expected_type:
raise PacketParsingError(
"Expected packet " "{}, but got {} instead!".format(
expected_type, packet_type
)
f"Expected packet {expected_type}, but got {packet_type} instead!"
)

return packet_type, header_len, body_len, header_len + body_len
Expand Down Expand Up @@ -349,8 +347,7 @@ def get_hashing_class(hash_algorithm_id):

except KeyError:
raise ValueError(
"Hash algorithm '{}' not supported, must be one of '{}' "
"(see RFC4880 9.4. Hash Algorithms).".format(
hash_algorithm_id, supported_hashing_algorithms
)
f"Hash algorithm '{hash_algorithm_id}' not supported, "
f"must be one of '{supported_hashing_algorithms}' "
"(see RFC4880 9.4. Hash Algorithms)."
)
2 changes: 1 addition & 1 deletion securesystemslib/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _canonical_string_encoder(string):
A string with the canonical-encoded 'string' embedded.
"""

string = '"%s"' % string.replace("\\", "\\\\").replace('"', '\\"')
string = '"{}"'.format(string.replace("\\", "\\\\").replace('"', '\\"'))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be the important line in this patch -- everythings else is error messages or otherwise non-critical. This seems correct to me.

I assume the reason for using format() here is the "quote-quoting" that would be tricky with f-strings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the issue is with the .replace('"', '\\"'). I guess we can turn it into an f-string by escaping these quotes, using format might be more readable in the end though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am mistaken, backslashes are not allowed inside the f-string expression. Keeping it as is will be the most readable solution and is accepted by the linter.


return string

Expand Down
2 changes: 1 addition & 1 deletion securesystemslib/hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

SUPPORTED_LIBRARIES.append("pyca_crypto")

class PycaDiggestWrapper(object):
class PycaDiggestWrapper:
"""
<Purpose>
A wrapper around `cryptography.hazmat.primitives.hashes.Hash` which adds
Expand Down
12 changes: 6 additions & 6 deletions securesystemslib/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def get(self, filepath: str) -> Iterator[BinaryIO]:
file_object = open(filepath, "rb")
yield file_object
except OSError:
raise exceptions.StorageError("Can't open %s" % filepath)
raise exceptions.StorageError(f"Can't open {filepath}")
finally:
if file_object is not None:
file_object.close()
Expand Down Expand Up @@ -244,7 +244,7 @@ def put(
destination_file.flush()
os.fsync(destination_file.fileno())
except OSError:
raise exceptions.StorageError("Can't write file %s" % filepath)
raise exceptions.StorageError(f"Can't write file {filepath}")

def remove(self, filepath: str) -> None:
try:
Expand All @@ -254,13 +254,13 @@ def remove(self, filepath: str) -> None:
PermissionError,
OSError,
): # pragma: no cover
raise exceptions.StorageError("Can't remove file %s" % filepath)
raise exceptions.StorageError(f"Can't remove file {filepath}")

def getsize(self, filepath: str) -> int:
try:
return os.path.getsize(filepath)
except OSError:
raise exceptions.StorageError("Can't access file %s" % filepath)
raise exceptions.StorageError(f"Can't access file {filepath}")

def create_folder(self, filepath: str) -> None:
try:
Expand All @@ -277,11 +277,11 @@ def create_folder(self, filepath: str) -> None:
)
else:
raise exceptions.StorageError(
"Can't create folder at %s" % filepath
f"Can't create folder at {filepath}"
)

def list_folder(self, filepath: str) -> List[str]:
try:
return os.listdir(filepath)
except FileNotFoundError:
raise exceptions.StorageError("Can't list folder at %s" % filepath)
raise exceptions.StorageError(f"Can't list folder at {filepath}")
2 changes: 1 addition & 1 deletion tests/check_azure_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_azure_sign(self):
Note that this test requires valid credentials available.
"""

data = "data".encode("utf-8")
data = b"data"

signer = Signer.from_priv_key_uri(self.azure_id, self.azure_pubkey)
sig = signer.sign(data)
Expand Down
2 changes: 1 addition & 1 deletion tests/check_kms_signers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_gcp_sign(self):
assign @jku.
"""

data = "data".encode("utf-8")
data = b"data"

signer = Signer.from_priv_key_uri(f"gcpkms:{self.gcp_id}", self.pubkey)
sig = signer.sign(data)
Expand Down
Loading