Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruff comprehensions and performance #2021

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bbot/core/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ def __init__(
self._scope_distance = None
self._module_priority = None
self._resolved_hosts = set()
self.dns_children = dict()
self.raw_dns_records = dict()
self.dns_children = {}
self.raw_dns_records = {}
self._discovery_context = ""
self._discovery_context_regex = re.compile(r"\{(?:event|module)[^}]*\}")
self.web_spider_distance = 0
Expand Down Expand Up @@ -769,7 +769,7 @@ def json(self, mode="json", siem_friendly=False):
Returns:
dict: JSON-serializable dictionary representation of the event object.
"""
j = dict()
j = {}
# type, ID, scope description
for i in ("type", "id", "uuid", "scope_description", "netloc"):
v = getattr(self, i, "")
Expand Down Expand Up @@ -1277,7 +1277,7 @@ def __init__(self, *args, **kwargs):
@property
def resolved_hosts(self):
# TODO: remove this when we rip out httpx
return set(".".join(i.split("-")[1:]) for i in self.tags if i.startswith("ip-"))
return {".".join(i.split("-")[1:]) for i in self.tags if i.startswith("ip-")}

@property
def pretty_string(self):
Expand Down
2 changes: 1 addition & 1 deletion bbot/core/helpers/depsinstaller/installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def ansible_run(self, tasks=None, module=None, args=None, ansible_args=None):
return success, err

def read_setup_status(self):
setup_status = dict()
setup_status = {}
if self.setup_status_cache.is_file():
with open(self.setup_status_cache) as f:
with suppress(Exception):
Expand Down
4 changes: 2 additions & 2 deletions bbot/core/helpers/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def _baseline(self):
ddiff = DeepDiff(baseline_1_json, baseline_2_json, ignore_order=True, view="tree")
self.ddiff_filters = []

for k, v in ddiff.items():
for k in ddiff.keys():
for x in list(ddiff[k]):
log.debug(f"Added {k} filter for path: {x.path()}")
self.ddiff_filters.append(x.path())
Expand Down Expand Up @@ -140,7 +140,7 @@ def compare_headers(self, headers_1, headers_2):

ddiff = DeepDiff(headers_1, headers_2, ignore_order=True, view="tree")

for k, v in ddiff.items():
for k in ddiff.keys():
for x in list(ddiff[k]):
try:
header_value = str(x).split("'")[1]
Expand Down
2 changes: 1 addition & 1 deletion bbot/core/helpers/dns/brute.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def gen_random_subdomains(self, n=50):
for i in range(0, max(0, n - 5)):
d = delimiters[i % len(delimiters)]
l = lengths[i % len(lengths)]
segments = list(random.choice(self.devops_mutations) for _ in range(l))
segments = [random.choice(self.devops_mutations) for _ in range(l)]
segments.append(self.parent_helper.rand_string(length=8, digits=False))
subdomain = d.join(segments)
yield subdomain
Expand Down
6 changes: 3 additions & 3 deletions bbot/core/helpers/dns/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, socket_path, config={}, debug=False):
dns_omit_queries = self.dns_config.get("omit_queries", None)
if not dns_omit_queries:
dns_omit_queries = []
self.dns_omit_queries = dict()
self.dns_omit_queries = {}
for d in dns_omit_queries:
d = d.split(":")
if len(d) == 2:
Expand All @@ -72,7 +72,7 @@ def __init__(self, socket_path, config={}, debug=False):
self.wildcard_ignore = []
self.wildcard_ignore = tuple([str(d).strip().lower() for d in self.wildcard_ignore])
self.wildcard_tests = self.dns_config.get("wildcard_tests", 5)
self._wildcard_cache = dict()
self._wildcard_cache = {}
# since wildcard detection takes some time, This is to prevent multiple
# modules from kicking off wildcard detection for the same domain at the same time
self._wildcard_lock = NamedLock()
Expand All @@ -82,7 +82,7 @@ def __init__(self, socket_path, config={}, debug=False):
self._last_connectivity_warning = time.time()
# keeps track of warnings issued for wildcard detection to prevent duplicate warnings
self._dns_warnings = set()
self._errors = dict()
self._errors = {}
self._debug = self.dns_config.get("debug", False)
self._dns_cache = LRUCache(maxsize=10000)

Expand Down
18 changes: 9 additions & 9 deletions bbot/core/helpers/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,12 +921,12 @@ def extract_params_xml(xml_data, compare_mode="getparam"):

# Define valid characters for each mode based on RFCs
valid_chars_dict = {
"header": set(
"header": {
chr(c) for c in range(33, 127) if chr(c) in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_"
),
"getparam": set(chr(c) for c in range(33, 127) if chr(c) not in ":/?#[]@!$&'()*+,;="),
"postparam": set(chr(c) for c in range(33, 127) if chr(c) not in ":/?#[]@!$&'()*+,;="),
"cookie": set(chr(c) for c in range(33, 127) if chr(c) not in '()<>@,;:"/[]?={} \t'),
},
"getparam": {chr(c) for c in range(33, 127) if chr(c) not in ":/?#[]@!$&'()*+,;="},
"postparam": {chr(c) for c in range(33, 127) if chr(c) not in ":/?#[]@!$&'()*+,;="},
"cookie": {chr(c) for c in range(33, 127) if chr(c) not in '()<>@,;:"/[]?={} \t'},
}


Expand Down Expand Up @@ -1148,7 +1148,7 @@ def chain_lists(
"""
if isinstance(l, str):
l = [l]
final_list = dict()
final_list = {}
for entry in l:
for s in split_regex.split(entry):
f = s.strip()
Expand Down Expand Up @@ -1345,7 +1345,7 @@ def search_dict_by_key(key, d):
if isinstance(d, dict):
if key in d:
yield d[key]
for k, v in d.items():
for v in d.values():
yield from search_dict_by_key(key, v)
elif isinstance(d, list):
for v in d:
Expand Down Expand Up @@ -1412,7 +1412,7 @@ def search_dict_values(d, *regexes):
results.add(h)
yield result
elif isinstance(d, dict):
for _, v in d.items():
for v in d.values():
yield from search_dict_values(v, *regexes)
elif isinstance(d, list):
for v in d:
Expand Down Expand Up @@ -2397,7 +2397,7 @@ def in_exception_chain(e, exc_types):
... if not in_exception_chain(e, (KeyboardInterrupt, asyncio.CancelledError)):
... raise
"""
return any([isinstance(_, exc_types) for _ in get_exception_chain(e)])
return any(isinstance(_, exc_types) for _ in get_exception_chain(e))


def get_traceback_details(e):
Expand Down
2 changes: 1 addition & 1 deletion bbot/core/helpers/regex.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def findall_multi(self, compiled_regexes, *args, threads=10, **kwargs):
"""
if not isinstance(compiled_regexes, dict):
raise ValueError('compiled_regexes must be a dictionary like this: {"regex_name": <compiled_regex>}')
for k, v in compiled_regexes.items():
for v in compiled_regexes.values():
self.ensure_compiled_regex(v)

tasks = {}
Expand Down
6 changes: 3 additions & 3 deletions bbot/core/helpers/regexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
_ipv4_regex + r"\/[0-9]{1,2}",
_ipv6_regex + r"\/[0-9]{1,3}",
)
ip_range_regexes = list(re.compile(r, re.I) for r in _ip_range_regexes)
ip_range_regexes = [re.compile(r, re.I) for r in _ip_range_regexes]

# dns names with periods
_dns_name_regex = r"(?:\w(?:[\w-]{0,100}\w)?\.)+(?:[xX][nN]--)?[^\W_]{1,63}\.?"
Expand Down Expand Up @@ -64,14 +64,14 @@
_hostname_regex + r":[0-9]{1,5}",
r"\[" + _ipv6_regex + r"\]:[0-9]{1,5}",
)
open_port_regexes = list(re.compile(r, re.I) for r in _open_port_regexes)
open_port_regexes = [re.compile(r, re.I) for r in _open_port_regexes]

_url_regexes = (
r"https?://" + _dns_name_regex + r"(?::[0-9]{1,5})?(?:(?:/|\?).*)?",
r"https?://" + _hostname_regex + r"(?::[0-9]{1,5})?(?:(?:/|\?).*)?",
r"https?://\[" + _ipv6_regex + r"\](?::[0-9]{1,5})?(?:(?:/|\?).*)?",
)
url_regexes = list(re.compile(r, re.I) for r in _url_regexes)
url_regexes = [re.compile(r, re.I) for r in _url_regexes]

_double_slash_regex = r"/{2,}"
double_slash_regex = re.compile(_double_slash_regex)
Expand Down
2 changes: 1 addition & 1 deletion bbot/core/helpers/wordcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def mutations(self, words, max_mutations=None):
def mutate(self, word, max_mutations=None, mutations=None):
if mutations is None:
mutations = self.top_mutations(max_mutations)
for mutation, count in mutations.items():
for mutation in mutations.keys():
ret = []
for s in mutation:
if s is not None:
Expand Down
32 changes: 16 additions & 16 deletions bbot/core/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def _preloaded(self):
return self.__preloaded

def get_recursive_dirs(self, *dirs):
dirs = set(Path(d).resolve() for d in dirs)
dirs = {Path(d).resolve() for d in dirs}
for d in list(dirs):
if not d.is_dir():
continue
Expand Down Expand Up @@ -340,61 +340,61 @@ def preload_module(self, module_file):
# class attributes that are dictionaries
if type(class_attr) == ast.Assign and type(class_attr.value) == ast.Dict:
# module options
if any([target.id == "options" for target in class_attr.targets]):
if any(target.id == "options" for target in class_attr.targets):
config.update(ast.literal_eval(class_attr.value))
# module options
elif any([target.id == "options_desc" for target in class_attr.targets]):
elif any(target.id == "options_desc" for target in class_attr.targets):
options_desc.update(ast.literal_eval(class_attr.value))
# module metadata
elif any([target.id == "meta" for target in class_attr.targets]):
elif any(target.id == "meta" for target in class_attr.targets):
meta = ast.literal_eval(class_attr.value)

# class attributes that are lists
if type(class_attr) == ast.Assign and type(class_attr.value) == ast.List:
# flags
if any([target.id == "flags" for target in class_attr.targets]):
if any(target.id == "flags" for target in class_attr.targets):
for flag in class_attr.value.elts:
if type(flag.value) == str:
flags.add(flag.value)
# watched events
elif any([target.id == "watched_events" for target in class_attr.targets]):
elif any(target.id == "watched_events" for target in class_attr.targets):
for event_type in class_attr.value.elts:
if type(event_type.value) == str:
watched_events.add(event_type.value)
# produced events
elif any([target.id == "produced_events" for target in class_attr.targets]):
elif any(target.id == "produced_events" for target in class_attr.targets):
for event_type in class_attr.value.elts:
if type(event_type.value) == str:
produced_events.add(event_type.value)

# bbot module dependencies
elif any([target.id == "deps_modules" for target in class_attr.targets]):
elif any(target.id == "deps_modules" for target in class_attr.targets):
for dep_module in class_attr.value.elts:
if type(dep_module.value) == str:
deps_modules.add(dep_module.value)
# python dependencies
elif any([target.id == "deps_pip" for target in class_attr.targets]):
elif any(target.id == "deps_pip" for target in class_attr.targets):
for dep_pip in class_attr.value.elts:
if type(dep_pip.value) == str:
deps_pip.append(dep_pip.value)
elif any([target.id == "deps_pip_constraints" for target in class_attr.targets]):
elif any(target.id == "deps_pip_constraints" for target in class_attr.targets):
for dep_pip in class_attr.value.elts:
if type(dep_pip.value) == str:
deps_pip_constraints.append(dep_pip.value)
# apt dependencies
elif any([target.id == "deps_apt" for target in class_attr.targets]):
elif any(target.id == "deps_apt" for target in class_attr.targets):
for dep_apt in class_attr.value.elts:
if type(dep_apt.value) == str:
deps_apt.append(dep_apt.value)
# bash dependencies
elif any([target.id == "deps_shell" for target in class_attr.targets]):
elif any(target.id == "deps_shell" for target in class_attr.targets):
for dep_shell in class_attr.value.elts:
deps_shell.append(ast.literal_eval(dep_shell))
# ansible playbook
elif any([target.id == "deps_ansible" for target in class_attr.targets]):
elif any(target.id == "deps_ansible" for target in class_attr.targets):
ansible_tasks = ast.literal_eval(class_attr.value)
# shared/common module dependencies
elif any([target.id == "deps_common" for target in class_attr.targets]):
elif any(target.id == "deps_common" for target in class_attr.targets):
for dep_common in class_attr.value.elts:
if type(dep_common.value) == str:
deps_common.append(dep_common.value)
Expand Down Expand Up @@ -540,7 +540,7 @@ def recommend_dependencies(self, modules):
with suppress(KeyError):
choices.remove(modname)
if event_type not in resolve_choices:
resolve_choices[event_type] = dict()
resolve_choices[event_type] = {}
deps = resolve_choices[event_type]
self.add_or_create(deps, "required_by", modname)
for c in choices:
Expand Down Expand Up @@ -639,7 +639,7 @@ def modules_options(self, modules=None, mod_type=None):
def modules_options_table(self, modules=None, mod_type=None):
table = []
header = ["Config Option", "Type", "Description", "Default"]
for module_name, module_options in self.modules_options(modules, mod_type).items():
for module_options in self.modules_options(modules, mod_type).values():
table += module_options
return make_table(table, header)

Expand Down
4 changes: 2 additions & 2 deletions bbot/modules/azure_tenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async def query(self, domain):
status_code = getattr(r, "status_code", 0)
if status_code not in (200, 421):
self.verbose(f'Error retrieving azure_tenant domains for "{domain}" (status code: {status_code})')
return set(), dict()
return set(), {}
found_domains = list(set(await self.helpers.re.findall(self.d_xml_regex, r.text)))
domains = set()

Expand All @@ -116,7 +116,7 @@ async def query(self, domain):
self.scan.word_cloud.absorb_word(d)

r = await openid_task
openid_config = dict()
openid_config = {}
with suppress(Exception):
openid_config = r.json()

Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/columbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ async def parse_results(self, r, query):
results = set()
json = r.json()
if json and isinstance(json, list):
return set([f"{s.lower()}.{query}" for s in json])
return {f"{s.lower()}.{query}" for s in json}
return results
10 changes: 5 additions & 5 deletions bbot/modules/deadly/ffuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ffuf(BaseModule):

deps_common = ["ffuf"]

banned_characters = set([" "])
banned_characters = {" "}
blacklist = ["images", "css", "image"]

in_scope_only = True
Expand Down Expand Up @@ -122,7 +122,7 @@ async def baseline_ffuf(self, url, exts=[""], prefix="", suffix="", mode="normal
continue

# if the codes are different, we should abort, this should also be a warning, as it is highly unusual behavior
if len(set(d["status"] for d in canary_results)) != 1:
if len({d["status"] for d in canary_results}) != 1:
self.warning("Got different codes for each baseline. This could indicate load balancing")
filters[ext] = ["ABORT", "BASELINE_CHANGED_CODES"]
continue
Expand All @@ -148,7 +148,7 @@ async def baseline_ffuf(self, url, exts=[""], prefix="", suffix="", mode="normal
continue

# we start by seeing if all of the baselines have the same character count
if len(set(d["length"] for d in canary_results)) == 1:
if len({d["length"] for d in canary_results}) == 1:
self.debug("All baseline results had the same char count, we can make a filter on that")
filters[ext] = [
"-fc",
Expand All @@ -161,7 +161,7 @@ async def baseline_ffuf(self, url, exts=[""], prefix="", suffix="", mode="normal
continue

# if that doesn't work we can try words
if len(set(d["words"] for d in canary_results)) == 1:
if len({d["words"] for d in canary_results}) == 1:
self.debug("All baseline results had the same word count, we can make a filter on that")
filters[ext] = [
"-fc",
Expand All @@ -174,7 +174,7 @@ async def baseline_ffuf(self, url, exts=[""], prefix="", suffix="", mode="normal
continue

# as a last resort we will try lines
if len(set(d["lines"] for d in canary_results)) == 1:
if len({d["lines"] for d in canary_results}) == 1:
self.debug("All baseline results had the same word count, we can make a filter on that")
filters[ext] = [
"-fc",
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/deadly/vhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class vhost(ffuf):
}

deps_common = ["ffuf"]
banned_characters = set([" ", "."])
banned_characters = {" ", "."}

in_scope_only = True

Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/dockerhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def handle_event(self, event):
async def handle_org_stub(self, event):
profile_name = event.data
# docker usernames are case sensitive, so if there are capitalizations we also try a lowercase variation
profiles_to_check = set([profile_name, profile_name.lower()])
profiles_to_check = {profile_name, profile_name.lower()}
for p in profiles_to_check:
api_url = f"{self.api_url}/users/{p}"
api_result = await self.helpers.request(api_url, follow_redirects=True)
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/extractous.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class extractous(BaseModule):
scope_distance_modifier = 1

async def setup(self):
self.extensions = list(set([e.lower().strip(".") for e in self.config.get("extensions", [])]))
self.extensions = list({e.lower().strip(".") for e in self.config.get("extensions", [])})
return True

async def filter_event(self, event):
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/filedownload.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class filedownload(BaseModule):
scope_distance_modifier = 3

async def setup(self):
self.extensions = list(set([e.lower().strip(".") for e in self.config.get("extensions", [])]))
self.extensions = list({e.lower().strip(".") for e in self.config.get("extensions", [])})
self.max_filesize = self.config.get("max_filesize", "10MB")
self.download_dir = self.scan.home / "filedownload"
self.helpers.mkdir(self.download_dir)
Expand Down
Loading
Loading