diff --git a/boefjes/boefjes/plugins/kat_adr_validator/main.py b/boefjes/boefjes/plugins/kat_adr_validator/main.py index de6604674c0..0dae8adfa50 100644 --- a/boefjes/boefjes/plugins/kat_adr_validator/main.py +++ b/boefjes/boefjes/plugins/kat_adr_validator/main.py @@ -17,8 +17,8 @@ def run_adr_validator(url: str) -> str: def run(boefje_meta: BoefjeMeta) -> List[Tuple[set, Union[bytes, str]]]: - input = boefje_meta.arguments["input"] - api_url = input["api_url"] + input_ooi = boefje_meta.arguments["input"] + api_url = input_ooi["api_url"] hostname = api_url["netloc"]["name"] path = api_url["path"] diff --git a/boefjes/boefjes/plugins/kat_leakix/main.py b/boefjes/boefjes/plugins/kat_leakix/main.py index 232f3051dad..9aa6c9fc3ba 100644 --- a/boefjes/boefjes/plugins/kat_leakix/main.py +++ b/boefjes/boefjes/plugins/kat_leakix/main.py @@ -21,13 +21,13 @@ def run(boefje_meta: BoefjeMeta) -> List[Tuple[set, Union[bytes, str]]]: else: raise NameError(f'Expected an IPAddress of Hostname, but got pk "{pk}"') - for type in ("leak", "service"): + for scope in ("leak", "service"): page_counter = 0 want_next_result = True while want_next_result: want_next_result = False response = requests.get( - f"https://leakix.net/search?scope={type}&q={dork}&page={page_counter}", + f"https://leakix.net/search?scope={scope}&q={dork}&page={page_counter}", headers={"Accept": "application/json", "api-key": getenv("LEAKIX_API")}, ) page_counter += 1 diff --git a/boefjes/boefjes/plugins/kat_nuclei/main.py b/boefjes/boefjes/plugins/kat_nuclei/main.py index 4cfe895b5ff..9ac64f110ac 100644 --- a/boefjes/boefjes/plugins/kat_nuclei/main.py +++ b/boefjes/boefjes/plugins/kat_nuclei/main.py @@ -7,15 +7,15 @@ NUCLEI_IMAGE = "projectdiscovery/nuclei:latest" -def verify_hostname_meta(input): +def verify_hostname_meta(input_ooi): # if the input object is HostnameHTTPURL then the hostname is located in netloc - if "netloc" in input and "name" in input["netloc"]: - netloc_name = input["netloc"]["name"] - port = input["port"] + if "netloc" in input_ooi and "name" in input_ooi["netloc"]: + netloc_name = input_ooi["netloc"]["name"] + port = input_ooi["port"] return f"{netloc_name}:{port}" else: # otherwise the Hostname input object is used - return input["name"] + return input_ooi["name"] def run(boefje_meta: BoefjeMeta) -> List[Tuple[set, Union[bytes, str]]]: diff --git a/boefjes/boefjes/plugins/kat_nuclei_cve/main.py b/boefjes/boefjes/plugins/kat_nuclei_cve/main.py index aae6fcb08c4..390446ea2ad 100644 --- a/boefjes/boefjes/plugins/kat_nuclei_cve/main.py +++ b/boefjes/boefjes/plugins/kat_nuclei_cve/main.py @@ -7,15 +7,15 @@ NUCLEI_IMAGE = "projectdiscovery/nuclei:v2.9.1" -def verify_hostname_meta(input): +def verify_hostname_meta(input_ooi): # if the input object is HostnameHTTPURL then the hostname is located in netloc - if "netloc" in input and "name" in input["netloc"]: - netloc_name = input["netloc"]["name"] - port = input["port"] + if "netloc" in input_ooi and "name" in input_ooi["netloc"]: + netloc_name = input_ooi["netloc"]["name"] + port = input_ooi["port"] return f"{netloc_name}:{port}" else: # otherwise the Hostname input object is used - return input["name"] + return input_ooi["name"] def run(boefje_meta: BoefjeMeta) -> List[Tuple[set, Union[bytes, str]]]: diff --git a/boefjes/boefjes/plugins/kat_nuclei_exposed_panels/main.py b/boefjes/boefjes/plugins/kat_nuclei_exposed_panels/main.py index d8ec3d8612f..d4ae8e66e37 100644 --- a/boefjes/boefjes/plugins/kat_nuclei_exposed_panels/main.py +++ b/boefjes/boefjes/plugins/kat_nuclei_exposed_panels/main.py @@ -7,15 +7,15 @@ NUCLEI_IMAGE = "projectdiscovery/nuclei:v2.9.1" -def verify_hostname_meta(input): +def verify_hostname_meta(input_ooi): # if the input object is HostnameHTTPURL then the hostname is located in netloc - if "netloc" in input and "name" in input["netloc"]: - netloc_name = input["netloc"]["name"] - port = input["port"] + if "netloc" in input_ooi and "name" in input_ooi["netloc"]: + netloc_name = input_ooi["netloc"]["name"] + port = input_ooi["port"] return f"{netloc_name}:{port}" else: # otherwise the Hostname input object is used - return input["name"] + return input_ooi["name"] def run(boefje_meta: BoefjeMeta) -> List[Tuple[set, Union[bytes, str]]]: diff --git a/boefjes/boefjes/plugins/kat_nuclei_take_over/main.py b/boefjes/boefjes/plugins/kat_nuclei_take_over/main.py index 1e59c153748..7f1b756238f 100644 --- a/boefjes/boefjes/plugins/kat_nuclei_take_over/main.py +++ b/boefjes/boefjes/plugins/kat_nuclei_take_over/main.py @@ -7,15 +7,15 @@ NUCLEI_IMAGE = "projectdiscovery/nuclei:v2.9.1" -def verify_hostname_meta(input): +def verify_hostname_meta(input_ooi): # if the input object is HostnameHTTPURL then the hostname is located in netloc - if "netloc" in input and "name" in input["netloc"]: - netloc_name = input["netloc"]["name"] - port = input["port"] + if "netloc" in input_ooi and "name" in input_ooi["netloc"]: + netloc_name = input_ooi["netloc"]["name"] + port = input_ooi["port"] return f"{netloc_name}:{port}" else: # otherwise the Hostname input object is used - return input["name"] + return input_ooi["name"] def run(boefje_meta: BoefjeMeta) -> List[Tuple[set, Union[bytes, str]]]: diff --git a/bytes/bytes/config.py b/bytes/bytes/config.py index cf7f5727d3a..991964b545c 100644 --- a/bytes/bytes/config.py +++ b/bytes/bytes/config.py @@ -47,10 +47,10 @@ def get_settings() -> Settings: def has_pastebin_key() -> bool: settings = get_settings() - return settings.pastebin_api_dev_key != "" + return bool(settings.pastebin_api_dev_key) def has_rfc3161_provider() -> bool: settings = get_settings() - return settings.rfc3161_provider != "" + return bool(settings.rfc3161_provider) diff --git a/bytes/bytes/timestamping/pastebin.py b/bytes/bytes/timestamping/pastebin.py index ba9b286fe43..1d1acb5421a 100644 --- a/bytes/bytes/timestamping/pastebin.py +++ b/bytes/bytes/timestamping/pastebin.py @@ -31,7 +31,7 @@ def store(self, secure_hash: SecureHash) -> RetrievalLink: return RetrievalLink(link) def retrieve(self, link: RetrievalLink) -> SecureHash: - if link == "": + if not link: raise ValueError("Can't retrieve secure-hash from empty link.") paste_id = link.split("/").pop() diff --git a/bytes/bytes/timestamping/rfc3161.py b/bytes/bytes/timestamping/rfc3161.py index 01cfae4706f..ba0b1992ff7 100644 --- a/bytes/bytes/timestamping/rfc3161.py +++ b/bytes/bytes/timestamping/rfc3161.py @@ -21,7 +21,7 @@ def store(self, secure_hash: SecureHash) -> RetrievalLink: def verify(self, link: RetrievalLink, secure_hash: SecureHash) -> bool: # Note: "link" is an inconvenient name for this implementation since it is a token. - if link == "": + if not link: raise ValueError("Can't retrieve secure-hash from empty link.") time_stamp_token = base64.b64decode(str(link)) diff --git a/bytes/tests/unit/test_raw_repository.py b/bytes/tests/unit/test_raw_repository.py index 8f6d73f3418..112f6ea67f0 100644 --- a/bytes/tests/unit/test_raw_repository.py +++ b/bytes/tests/unit/test_raw_repository.py @@ -13,7 +13,7 @@ def has_encryption_keys() -> bool: settings = get_settings() - return settings.kat_private_key_b64 != "" and settings.vws_public_key_b64 != "" + return settings.kat_private_key_b64 and settings.vws_public_key_b64 def test_save_raw(raw_repository: FileRawRepository) -> None: diff --git a/keiko/keiko/keiko.py b/keiko/keiko/keiko.py index bde12068f20..653ae8f485d 100644 --- a/keiko/keiko/keiko.py +++ b/keiko/keiko/keiko.py @@ -43,7 +43,7 @@ ) -def latex_escape(input: Any) -> str: +def latex_escape(text: Any) -> str: """Escape characters that are special in LaTeX. References: @@ -51,14 +51,14 @@ def latex_escape(input: Any) -> str: - http://tex.stackexchange.com/a/34586/43228 - http://stackoverflow.com/a/16264094/2570866 """ - if not isinstance(input, str): - input = str(input) - return input.translate(LATEX_SPECIAL_CHARS) + if not isinstance(text, str): + text = str(text) + return text.translate(LATEX_SPECIAL_CHARS) -def baretext(input_: str) -> str: +def baretext(text: str) -> str: """Remove non-alphanumeric characters from a string.""" - return "".join(filter(str.isalnum, input_)).lower() + return "".join(filter(str.isalnum, text)).lower() @tracer.start_as_current_span("generate_report") @@ -231,6 +231,6 @@ def read_glossary(glossary: str, settings: Settings) -> Dict[str, Tuple[str, str for row in csvreader: # only allow words with baretext representation bare_word = baretext(row[0]) - if bare_word != "": + if bare_word: glossary_entries[baretext(row[0])] = row[0], row[1] return glossary_entries diff --git a/mula/scheduler/queues/pq.py b/mula/scheduler/queues/pq.py index d3317e5d55a..3700034440f 100644 --- a/mula/scheduler/queues/pq.py +++ b/mula/scheduler/queues/pq.py @@ -223,16 +223,16 @@ def is_item_on_queue(self, p_item: models.PrioritizedItem) -> bool: return True - def is_item_on_queue_by_hash(self, hash: str) -> bool: + def is_item_on_queue_by_hash(self, item_hash: str) -> bool: """Check if an item is on the queue by its hash. Args: - hash: The hash of the item to be checked. + item_hash: The hash of the item to be checked. Returns: True if the item is on the queue, False otherwise. """ - item = self.pq_store.get_item_by_hash(self.pq_id, hash) + item = self.pq_store.get_item_by_hash(self.pq_id, item_hash) return item is not None def get_p_item_by_identifier(self, p_item: models.PrioritizedItem) -> Optional[models.PrioritizedItem]: diff --git a/mula/scheduler/repositories/sqlalchemy/task_store.py b/mula/scheduler/repositories/sqlalchemy/task_store.py index a958a2a05e4..db2249a56ac 100644 --- a/mula/scheduler/repositories/sqlalchemy/task_store.py +++ b/mula/scheduler/repositories/sqlalchemy/task_store.py @@ -23,7 +23,7 @@ def __init__(self, datastore: SQLAlchemy) -> None: def get_tasks( self, scheduler_id: Optional[str], - type: Optional[str], + task_type: Optional[str], status: Optional[str], min_created_at: Optional[datetime.datetime], max_created_at: Optional[datetime.datetime], @@ -38,8 +38,8 @@ def get_tasks( if scheduler_id is not None: query = query.filter(models.TaskORM.scheduler_id == scheduler_id) - if type is not None: - query = query.filter(models.TaskORM.type == type) + if task_type is not None: + query = query.filter(models.TaskORM.type == task_type) if status is not None: query = query.filter(models.TaskORM.status == models.TaskStatus(status).name) diff --git a/mula/scheduler/repositories/stores.py b/mula/scheduler/repositories/stores.py index e6347edb8ff..70f229712af 100644 --- a/mula/scheduler/repositories/stores.py +++ b/mula/scheduler/repositories/stores.py @@ -19,7 +19,7 @@ def __init__(self) -> None: def get_tasks( self, scheduler_id: Optional[str], - type: Optional[str], + task_type: Optional[str], status: Optional[str], min_created_at: Optional[datetime.datetime], max_created_at: Optional[datetime.datetime], diff --git a/mula/scheduler/schedulers/scheduler.py b/mula/scheduler/schedulers/scheduler.py index 335e3e8ef8b..ad3d64d4013 100644 --- a/mula/scheduler/schedulers/scheduler.py +++ b/mula/scheduler/schedulers/scheduler.py @@ -300,8 +300,8 @@ def is_space_on_queue(self) -> bool: return True - def is_item_on_queue_by_hash(self, hash: str) -> bool: - return self.queue.is_item_on_queue_by_hash(hash) + def is_item_on_queue_by_hash(self, item_hash: str) -> bool: + return self.queue.is_item_on_queue_by_hash(item_hash) def stop(self) -> None: """Stop the scheduler.""" diff --git a/mula/scheduler/server/server.py b/mula/scheduler/server/server.py index a1de21d1eb6..9ac3035e5ff 100644 --- a/mula/scheduler/server/server.py +++ b/mula/scheduler/server/server.py @@ -231,7 +231,7 @@ def list_tasks( self, request: fastapi.Request, scheduler_id: Optional[str] = None, - type: Optional[str] = None, + task_type: Optional[str] = None, status: Optional[str] = None, offset: int = 0, limit: int = 10, @@ -246,7 +246,7 @@ def list_tasks( results, count = self.ctx.task_store.get_tasks( scheduler_id=scheduler_id, - type=type, + task_type=task_type, status=status, offset=offset, limit=limit, diff --git a/octopoes/bits/oois_in_headers/oois_in_headers.py b/octopoes/bits/oois_in_headers/oois_in_headers.py index 981fe926bd8..cc15d402b81 100644 --- a/octopoes/bits/oois_in_headers/oois_in_headers.py +++ b/octopoes/bits/oois_in_headers/oois_in_headers.py @@ -35,15 +35,15 @@ def run(input_ooi: HTTPHeader, additional_oois: List, config: Dict[str, str]) -> if input_ooi.key.lower() == "content-security-policy": urls_and_hostname = re.findall(r"(?:(?:https?|ftp):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]+", input_ooi.value) - for object in urls_and_hostname: + for url_or_hostname in urls_and_hostname: try: - u = URL(raw=object, network=network.reference) + u = URL(raw=url_or_hostname, network=network.reference) yield u http_header_url = HTTPHeaderURL(header=input_ooi.reference, url=u.reference) yield http_header_url # some hostnames get classified as urls by the regex here, they need to be parsed by another bit except ValidationError: - name = object if object[0] != "." else object[1:] + name = url_or_hostname if url_or_hostname[0] != "." else url_or_hostname[1:] h = Hostname(name=name, network=network.reference) yield h http_header_hostname = HTTPHeaderHostname(header=input_ooi.reference, hostname=h.reference) diff --git a/octopoes/bits/spf_discovery/internetnl_spf_parser.py b/octopoes/bits/spf_discovery/internetnl_spf_parser.py index 43d8a85fd23..11e84f082c8 100644 --- a/octopoes/bits/spf_discovery/internetnl_spf_parser.py +++ b/octopoes/bits/spf_discovery/internetnl_spf_parser.py @@ -131,9 +131,9 @@ def _check_domain_end(tokens): + Optional(dual_cidr_length) ) include = Combine(Optional(qualifier) + CaselessLiteral("include:") + domain_spec) -all = Combine(Optional(qualifier) + CaselessLiteral("all")) +all_ = Combine(Optional(qualifier) + CaselessLiteral("all")) -mechanism = all | include | a | mx | ptr | ip4 | ip6 | exists +mechanism = all_ | include | a | mx | ptr | ip4 | ip6 | exists directive = mechanism terms = ZeroOrMore(OneOrMore(SP) + (directive | modifier)) diff --git a/octopoes/octopoes/models/ooi/software.py b/octopoes/octopoes/models/ooi/software.py index eaedfb66d13..7b243491f2e 100644 --- a/octopoes/octopoes/models/ooi/software.py +++ b/octopoes/octopoes/models/ooi/software.py @@ -18,7 +18,7 @@ class Software(OOI): @classmethod def format_reference_human_readable(cls, reference: Reference) -> str: version = reference.tokenized.version - if version != "": + if version: version = f" {version}" return f"{reference.tokenized.name}{version}" diff --git a/pyproject.toml b/pyproject.toml index 837bdcbbf42..7dd40ae8182 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,8 +25,9 @@ paths = ["."] [tool.ruff] # Enable classic flake8, pyflakes, eradicate, and tidyimport # To be extended with DJ, PT, RUF, C90, D, PL, RET -select = ["E", "F", "ERA", "W", "TID", "I", "G", "INP", "T20", "UP", "ISC", "PTH", "SIM"] -ignore = ["UP006", "UP007"] # non-pep585-annotation and non-pep604-annotation are not actually compatible with 3.8 +select = ["E", "F", "ERA", "W", "TID", "I", "G", "INP", "T20", "UP", "ISC", "PTH", "SIM", "PLC", "A"] +# non-pep585-annotation and non-pep604-annotation are not actually compatible with 3.8 +ignore = ["UP006", "UP007", "A003"] fix = true # Exclude a variety of commonly ignored directories. @@ -73,7 +74,7 @@ task-tags = ["Example", "todo", "TODO", "FIXME"] "boefjes/boefjes/plugins/kat_binaryedge/http_web/normalize.py" = ["ERA001"] "*/packaging/*" = ["INP"] "*/.ci/*" = ["INP"] -"conf.py" = ["INP", "PTH"] +"conf.py" = ["INP", "PTH", "A"] "conftest.py" = ["INP"] "setup.py" = ["INP"] "manage.py" = ["INP"] diff --git a/rocky/katalogus/views/plugin_detail.py b/rocky/katalogus/views/plugin_detail.py index 3d4d1f0a23b..cef6f12d2f2 100644 --- a/rocky/katalogus/views/plugin_detail.py +++ b/rocky/katalogus/views/plugin_detail.py @@ -41,7 +41,7 @@ class PluginDetailView(PluginSettingsListView, BoefjeMixin, TemplateView): def get_scan_history(self) -> Page: scheduler_id = f"{self.plugin.type}-{self.organization.code}" - type = self.plugin.type + plugin_type = self.plugin.type plugin_id = self.plugin.id input_ooi = self.request.GET.get("scan_history_search") status = self.request.GET.get("scan_history_status") @@ -60,7 +60,7 @@ def get_scan_history(self) -> Page: scan_history = scheduler.client.get_lazy_task_list( scheduler_id=scheduler_id, - type=type, + task_type=plugin_type, plugin_id=plugin_id, input_ooi=input_ooi, status=status, diff --git a/rocky/rocky/scheduler.py b/rocky/rocky/scheduler.py index ea4b59f9b29..da36d13280a 100644 --- a/rocky/rocky/scheduler.py +++ b/rocky/rocky/scheduler.py @@ -178,7 +178,7 @@ def list_tasks( def get_lazy_task_list( self, scheduler_id: str, - type: Optional[str] = None, + task_type: Optional[str] = None, status: Optional[str] = None, min_created_at: Optional[datetime.datetime] = None, max_created_at: Optional[datetime.datetime] = None, @@ -189,7 +189,7 @@ def get_lazy_task_list( return LazyTaskList( self, scheduler_id=scheduler_id, - type=type, + type=task_type, status=status, min_created_at=min_created_at, max_created_at=max_created_at, diff --git a/rocky/rocky/views/ooi_detail.py b/rocky/rocky/views/ooi_detail.py index b91f48ec705..03189c9097b 100644 --- a/rocky/rocky/views/ooi_detail.py +++ b/rocky/rocky/views/ooi_detail.py @@ -117,7 +117,7 @@ def get_scan_history(self) -> Page: status=status, min_created_at=min_created_at, max_created_at=max_created_at, - type="boefje", + task_type="boefje", input_ooi=self.get_ooi_id(), plugin_id=plugin_id, ) diff --git a/rocky/tools/add_ooi_information.py b/rocky/tools/add_ooi_information.py index df82cef72f5..5a7e5496d68 100644 --- a/rocky/tools/add_ooi_information.py +++ b/rocky/tools/add_ooi_information.py @@ -254,9 +254,9 @@ def table_to_2d(table_tag): return table -def _map_usage_value(value: str): +def _map_usage_value(value: str) -> bool: value = value.lower().strip() - return value is not None and value != "" and value != "no" + return value is not None and value and value != "no" def wiki_port_tables() -> List[_PortInfo]: diff --git a/rocky/tools/admin.py b/rocky/tools/admin.py index c0bf85bf5cd..f69ccb092b2 100644 --- a/rocky/tools/admin.py +++ b/rocky/tools/admin.py @@ -34,7 +34,7 @@ class OOIInformationAdmin(admin.ModelAdmin): # if pk is not readonly, it will create a new record upon editing def get_readonly_fields(self, request, obj=None): if obj is not None: # editing an existing object - if obj.value == "": + if not obj.value: return self.readonly_fields + ( "id", "consult_api", diff --git a/rocky/tools/forms/finding_type.py b/rocky/tools/forms/finding_type.py index a82a957eac8..3716600b53c 100644 --- a/rocky/tools/forms/finding_type.py +++ b/rocky/tools/forms/finding_type.py @@ -87,8 +87,8 @@ def clean_id(self): return data - def check_finding_type_existence(self, id): - _, created = OOIInformation.objects.get_or_create(id=f"KATFindingType|{id}") + def check_finding_type_existence(self, finding_type_id): + _, created = OOIInformation.objects.get_or_create(id=f"KATFindingType|{finding_type_id}") if not created: raise ValidationError(_("Finding type already exists")) diff --git a/rocky/tools/models.py b/rocky/tools/models.py index fbfb35189cc..fdedcc9556a 100644 --- a/rocky/tools/models.py +++ b/rocky/tools/models.py @@ -257,7 +257,7 @@ def value(self): @property def description(self): - if self.data["description"] == "": + if not self.data["description"]: self.get_internet_description() return self.data["description"] diff --git a/rocky/tools/ooi_form.py b/rocky/tools/ooi_form.py index 1c2a9cdb40c..9b09df97c6f 100644 --- a/rocky/tools/ooi_form.py +++ b/rocky/tools/ooi_form.py @@ -26,7 +26,7 @@ def __init__(self, ooi_class: Type[OOI], connector: OctopoesAPIConnector, *args, self.fields[name] = field def clean(self): - return {key: value for key, value in super().clean().items() if value != ""} + return {key: value for key, value in super().clean().items() if value} def get_fields(self) -> Dict[str, forms.fields.Field]: return self.generate_form_fields()