Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ruff rules E711,E712,E713,E731,F541 #2037

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bbot/modules/dnstlsrpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def filter_event(self, event):
return False, "event is wildcard"

# there's no value in inspecting service records
if service_record(event.host) == True:
if service_record(event.host) is True:
return False, "service record detected"

return True
Expand Down
4 changes: 2 additions & 2 deletions bbot/modules/dotnetnuke.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def setup(self):
self.interactsh_subdomain_tags = {}
self.interactsh_instance = None

if self.scan.config.get("interactsh_disable", False) == False:
if self.scan.config.get("interactsh_disable", False) is False:
try:
self.interactsh_instance = self.helpers.interactsh()
self.interactsh_domain = await self.interactsh_instance.register(callback=self.interactsh_callback)
Expand Down Expand Up @@ -93,7 +93,7 @@ async def handle_event(self, event):
detected = True
break

if detected == True:
if detected is True:
# DNNPersonalization Deserialization Detection
for probe_url in [f'{event.data["url"]}/__', f'{event.data["url"]}/', f'{event.data["url"]}']:
result = await self.helpers.request(probe_url, cookies=self.exploit_probe)
Expand Down
29 changes: 15 additions & 14 deletions bbot/modules/internal/excavate.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ def __init__(self, excavate):
self.parameterExtractorCallbackDict[r.__name__] = r
regexes_component_list.append(f"${r.__name__} = {r.discovery_regex}")
regexes_component = " ".join(regexes_component_list)
self.yara_rules[f"parameter_extraction"] = (
self.yara_rules["parameter_extraction"] = (
rf'rule parameter_extraction {{meta: description = "contains POST form" strings: {regexes_component} condition: any of them}}'
)

Expand Down Expand Up @@ -503,7 +503,7 @@ async def process(self, yara_results, event, yara_rule_settings, discovery_conte
)

if self.excavate.helpers.validate_parameter(parameter_name, parameter_type):
if self.excavate.in_bl(parameter_name) == False:
if self.excavate.in_bl(parameter_name) is False:
parsed_url = urlparse(url)
description = f"HTTP Extracted Parameter [{parameter_name}] ({parameterExtractorSubModule.name} Submodule)"
data = {
Expand Down Expand Up @@ -579,7 +579,7 @@ def __init__(self, excavate):
for signature_name, signature in self.signatures.items():
signature_component_list.append(rf"${signature_name} = {signature}")
signature_component = " ".join(signature_component_list)
self.yara_rules[f"error_detection"] = (
self.yara_rules["error_detection"] = (
f'rule error_detection {{meta: description = "contains a verbose error message" strings: {signature_component} condition: any of them}}'
)

Expand Down Expand Up @@ -608,7 +608,7 @@ def __init__(self, excavate):
for regex_name, regex in self.regexes.items():
regexes_component_list.append(rf"${regex_name} = /\b{regex.pattern}/ nocase")
regexes_component = " ".join(regexes_component_list)
self.yara_rules[f"serialization_detection"] = (
self.yara_rules["serialization_detection"] = (
f'rule serialization_detection {{meta: description = "contains a possible serialized object" strings: {regexes_component} condition: any of them}}'
)

Expand Down Expand Up @@ -656,7 +656,8 @@ async def process(self, yara_results, event, yara_rule_settings, discovery_conte
continue
if parsed_url.scheme in ["http", "https"]:
continue
abort_if = lambda e: e.scope_distance > 0
def abort_if(e):
return e.scope_distance > 0
finding_data = {"host": str(host), "description": f"Non-HTTP URI: {parsed_url.geturl()}"}
await self.report(finding_data, event, yara_rule_settings, discovery_context, abort_if=abort_if)
protocol_data = {"protocol": parsed_url.scheme, "host": str(host)}
Expand Down Expand Up @@ -769,7 +770,7 @@ class HostnameExtractor(ExcavateRule):
def __init__(self, excavate):
super().__init__(excavate)
if excavate.scan.dns_yara_rules_uncompiled:
self.yara_rules[f"hostname_extraction"] = excavate.scan.dns_yara_rules_uncompiled
self.yara_rules["hostname_extraction"] = excavate.scan.dns_yara_rules_uncompiled

async def process(self, yara_results, event, yara_rule_settings, discovery_context):
for identifier in yara_results.keys():
Expand Down Expand Up @@ -817,7 +818,7 @@ async def setup(self):
self.parameter_extraction = bool(modules_WEB_PARAMETER)

self.retain_querystring = False
if self.config.get("retain_querystring", False) == True:
if self.config.get("retain_querystring", False) is True:
self.retain_querystring = True

for module in self.scan.modules.values():
Expand Down Expand Up @@ -847,7 +848,7 @@ async def setup(self):
rules_content = f.read()
self.debug(f"Successfully loaded custom yara rules file [{self.custom_yara_rules}]")
else:
self.debug(f"Custom yara rules file is NOT a file. Will attempt to treat it as rule content")
self.debug("Custom yara rules file is NOT a file. Will attempt to treat it as rule content")
rules_content = self.custom_yara_rules

self.debug(f"Final combined yara rule contents: {rules_content}")
Expand All @@ -860,7 +861,7 @@ async def setup(self):

rule_match = await self.helpers.re.search(self.yara_rule_name_regex, rule_content)
if not rule_match:
return False, f"Custom Yara formatted incorrectly: could not find rule name"
return False, "Custom Yara formatted incorrectly: could not find rule name"

rule_name = rule_match.groups(1)[0]
c = CustomExtractor(self)
Expand Down Expand Up @@ -936,8 +937,8 @@ async def handle_event(self, event):
if event.type == "HTTP_RESPONSE":
# Harvest GET parameters from URL, if it came directly from the target, and parameter extraction is enabled
if (
self.parameter_extraction == True
and self.url_querystring_remove == False
self.parameter_extraction is True
and self.url_querystring_remove is False
and str(event.parent.parent.module) == "TARGET"
):
self.debug(f"Processing target URL [{urlunparse(event.parsed_url)}] for GET parameters")
Expand All @@ -949,7 +950,7 @@ async def handle_event(self, event):
regex_name,
additional_params,
) in extract_params_url(event.parsed_url):
if self.in_bl(parameter_name) == False:
if self.in_bl(parameter_name) is False:
description = f"HTTP Extracted Parameter [{parameter_name}] (Target URL)"
data = {
"host": parsed_url.hostname,
Expand Down Expand Up @@ -985,7 +986,7 @@ async def handle_event(self, event):
cookie_name = header_value.split("=")[0]
cookie_value = header_value.split("=")[1].split(";")[0]

if self.in_bl(cookie_value) == False:
if self.in_bl(cookie_value) is False:
self.assigned_cookies[cookie_name] = cookie_value
description = f"Set-Cookie Assigned Cookie [{cookie_name}]"
data = {
Expand Down Expand Up @@ -1029,7 +1030,7 @@ async def handle_event(self, event):
regex_name,
additional_params,
) in extract_params_location(header_value, event.parsed_url):
if self.in_bl(parameter_name) == False:
if self.in_bl(parameter_name) is False:
description = f"HTTP Extracted Parameter [{parameter_name}] (Location Header)"
data = {
"host": parsed_url.hostname,
Expand Down
8 changes: 4 additions & 4 deletions bbot/modules/paramminer_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async def handle_event(self, event):
self.debug(f"Error initializing compare helper: {e}")
return
batch_size = await self.count_test(url)
if batch_size == None or batch_size <= 0:
if batch_size is None or batch_size <= 0:
self.debug(f"Failed to get baseline max {self.compare_mode} count, aborting")
return
self.debug(f"Resolved batch_size at {str(batch_size)}")
Expand All @@ -195,11 +195,11 @@ async def count_test(self, url):
baseline = await self.helpers.request(url)
if baseline is None:
return
if str(baseline.status_code)[0] in ("4", "5"):
if str(baseline.status_code)[0] in {"4", "5"}:
return
for count, args, kwargs in self.gen_count_args(url):
r = await self.helpers.request(*args, **kwargs)
if r is not None and not (str(r.status_code)[0] in ("4", "5")):
if r is not None and str(r.status_code)[0] not in {"4", "5"}:
return count

def gen_count_args(self, url):
Expand All @@ -222,7 +222,7 @@ async def binary_search(self, compare_helper, url, group, reasons=None, reflecti
elif len(group) > 1 or (len(group) == 1 and len(reasons) == 0):
for group_slice in self.helpers.split_list(group):
match, reasons, reflection, subject_response = await self.check_batch(compare_helper, url, group_slice)
if match == False:
if match is False:
async for r in self.binary_search(compare_helper, url, group_slice, reasons, reflection):
yield r
else:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ skip = "./docs/javascripts/vega*.js,./bbot/wordlists/*"
[tool.ruff]
line-length = 119
format.exclude = ["bbot/test/test_step_1/test_manager_*"]
lint.ignore = ["E402", "E711", "E712", "E713", "E721", "E731", "E741", "F401", "F403", "F405", "F541", "F601"]
lint.ignore = ["E402", "E721", "E741", "F401", "F403", "F405"]

[tool.poetry-dynamic-versioning]
enable = true
Expand Down
Loading