Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Merge pull request #14 from rly0nheart/dev
Browse files Browse the repository at this point in the history
Dev -> 1.1.0
  • Loading branch information
rly0nheart authored Oct 25, 2023
2 parents 20c704b + da6b291 commit fcf0e8a
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 212 deletions.
2 changes: 1 addition & 1 deletion netlasso/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__author__ = "Richard Mwewa"
__about__ = "https://about.me/rly0nheart"
__version__ = "1.0.0"
__version__ = "1.1.0"
__description__ = """
# Net Lasso
> **Net Lasso** utilises the [Netlas.io API](https://netlas.io/api) to perform advanced searches for internet-connected (IoT) devices based on user-provided search queries."""
Expand Down
78 changes: 50 additions & 28 deletions netlasso/api.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,90 @@
import aiohttp
import requests

from .coreutils import log
from .coreutils import log, __version__
from .messages import message


class API:
class Api:
BASE_NETLAS_API_ENDPOINT = "https://app.netlas.io/api"
BASE_GITHUB_API_ENDPOINT = "https://api.github.com"

def __init__(self):
self.search_endpoint = (
f"{API.BASE_NETLAS_API_ENDPOINT}/responses/?q=%s&start=%s"
f"{Api.BASE_NETLAS_API_ENDPOINT}/responses/?q=%s&start=%s"
)
self.updates_endpoint = (
f"{Api.BASE_GITHUB_API_ENDPOINT}/repos/rly0nheart/netlasso/releases/latest"
)
self.updates_endpoint = ""

@staticmethod
async def get_data(endpoint: str) -> dict:
def get_data(endpoint: str) -> dict:
"""
Asynchronously fetches JSON data from a given API endpoint.
Fetches JSON data from a given API endpoint.
:param endpoint: The API endpoint to fetch data from.
:return: A JSON object from the endpoint.
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(endpoint) as response:
if response.status == 200:
data = await response.json()
return data
else:
error_message = await response.json()
log.error(
message(
message_type="error",
message_key="api_error",
error_message=error_message,
)
with requests.get(url=endpoint) as response:
if response.status_code == 200:
return response.json()
else:
log.error(
message(
message_type="error",
message_key="api_error",
error_message=response.json(),
)
except aiohttp.ClientError as e:
)
except requests.exceptions.RequestException as error:
log.error(
message(
message_type="error",
message_key="http_error",
error_message=str(e),
error_message=error,
)
)
except Exception as e:
except Exception as error:
log.critical(
message(
message_type="error",
message_key="unexpected_error",
error_message=str(e),
error_message=error,
)
)

async def search(self, query: str, page: int):
def check_updates(self):
"""
Checks for updates from the program's releases.
If the release version does not match the current program version, assume the program is outdated.
"""
from rich.markdown import Markdown
from rich import print

response = self.get_data(endpoint=self.updates_endpoint)
if response:
remote_version = response.get("tag_name")
if remote_version != __version__:
log.info(
message(
message_type="info",
message_key="update",
program_name="Net Lasso",
program_call_name="netlasso",
version=remote_version,
)
)

release_notes = Markdown(response.get("body"))
print(release_notes)

def search(self, query: str, page: int):
"""
Searches Netlas.io and fetches search results that match a given query.
:param query: Search query string.
:param page: Page number to get results from (default is 0)
:return: A list of results that matched the query.
"""
search_results = await self.get_data(
endpoint=self.search_endpoint % (query, page)
)
search_results = self.get_data(endpoint=self.search_endpoint % (query, page))
return search_results.get("items")
155 changes: 155 additions & 0 deletions netlasso/masonry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from datetime import datetime
from typing import Union

from rich import print
from rich.tree import Tree

from .coreutils import format_api_data, save_data


class TreeMason:
def __init__(
self,
query: str,
results: list,
limit: int,
save_json: bool,
save_csv: bool,
return_raw: bool = False,
):
"""
Initialises the TreeMason class with results' data to populate the main_tree with.
:param query: A search query string.
:param results: A list of JSON objects each containing result data.
:param limit: Number of results to show.
:param save_json: A boolean value indicating whether each result should be saved to a JSON file.
:param save_csv: A boolean value indicating whether each result should be saved to a CSV file.
:param return_raw: A boolean value indicating whether results
should not be visualised in a tree and instead be printed in raw JSON format.
"""
self.results = results
self.limit = limit
self.save_json = save_json
self.save_csv = save_csv
self.return_raw = return_raw
self.main_tree = Tree(
f"Visualising [cyan]{self.limit}[/] results for [italic][yellow]{query}[/][/] - {datetime.now()}",
guide_style="bold bright_blue",
)

@staticmethod
def add_branch(
target_tree: Tree,
branch_title: str,
branch_data: Union[dict, list],
additional_text: str = None,
) -> Tree:
"""
Adds a branch to a specified tree and populates it with the given data.
:param target_tree: Tree to add the branch to.
:param branch_title: Branch title.
:param branch_data: Branch data
:param additional_text: Additional text to add at the end of the branch.
:return: A populated branch.
"""
branch = target_tree.add(branch_title)
data_types = [dict, list]
if type(branch_data) in data_types:
if type(branch_data) is dict:
for key, value in branch_data.items():
branch.add(f"{key}: {value}", style="dim")
if additional_text:
branch.add(additional_text, style="italic")
else:
for index, item in enumerate(branch_data, start=1):
branch.add(f"{index}. {item}", style="italic")
return branch

def result_branch(self, main_tree: Tree, result_data: dict) -> Tree:
"""
Adds a result_branch to the main_tree and populates it with result_data.
:param main_tree: Tree to add the branch to.
:param result_data: Data to populate the branch with.
:return: main_tree with the populated result_branch.
"""
summary_data = format_api_data(api_data=result_data, data_file="summary.json")

# Add a summary branch to the main_tree and populate it with the summary data
branch = main_tree.add(result_data.get("isp"))
for summary_key, summary_value in summary_data.items():
branch.add(f"{summary_key}: {summary_value}", style="dim")

# Add a location branch to the main_tree and populate it with location data
self.add_branch(
target_tree=branch,
branch_title="Location",
branch_data=format_api_data(
api_data=result_data.get("geo"), data_file="location.json"
),
)

# Add a WHOIS branch to the main_tree and populate it with WHOIS data (ASN, NET)
whois_branch = branch.add("WHOIS")
self.add_branch(
target_tree=whois_branch,
branch_title="Net",
branch_data=format_api_data(
api_data=result_data.get("whois").get("net"),
data_file="net.json",
),
additional_text=result_data.get("whois").get("net").get("description"),
)
self.add_branch(
target_tree=whois_branch,
branch_title="ASN",
branch_data=format_api_data(
api_data=result_data.get("whois").get("asn"),
data_file="asn.json",
),
)

# Add a Domains branch to the main_tree and populate it with a list of domains associated with the result
if result_data.get("domain"):
self.add_branch(
target_tree=branch,
branch_title="Domains",
branch_data=result_data.get("domain"),
)

return main_tree

def visualise_results(self):
"""
Saves and visualises the search results into a tree structure.
"""
if self.results:
for result_index, result in enumerate(self.results, start=1):
raw_result_data = result.get("data")

# If -r/--raw is passed, return results in raw JSON format
if self.return_raw:
print(raw_result_data)
print("\n")
else:
# Otherwise, populate the result_branch and add it to the main_tree
self.result_branch(
main_tree=self.main_tree, result_data=raw_result_data
)

# Save each result to a file (if the right command-line arguments are passed)
save_data(
data=raw_result_data,
save_to_json=self.save_json,
save_to_csv=self.save_csv,
filename=f"{raw_result_data.get('isp')}_{raw_result_data.get('ip')}",
)

# If result_index is equal to the limit, break the loop and visualise the main_tree
if result_index == self.limit:
break

if not self.return_raw:
print(self.main_tree)
2 changes: 1 addition & 1 deletion netlasso/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"help": "usage: [green]{program_call} -h/--help[/]",
"data_saved": "{data_type} written to [link file://{file_path}]{file_path}",
"update": "[bold]{program_name}[/] {version} is available. "
"To update, run: [italic][green]pip install --upgrade {program_call}[/][/]",
"To update, run: [italic][green]pip install --upgrade {program_call_name}[/][/]",
}


Expand Down
Loading

0 comments on commit fcf0e8a

Please sign in to comment.