Skip to content

Commit

Permalink
General cleanup and improvements (#187)
Browse files Browse the repository at this point in the history
* Added typing; dropped support for python<3.10; general improvements and cleanup
  • Loading branch information
P-T-I authored Oct 11, 2023
1 parent c6fb57e commit c260668
Show file tree
Hide file tree
Showing 24 changed files with 264 additions and 438 deletions.
2 changes: 1 addition & 1 deletion CveXplore/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.6
0.3.6.dev11
11 changes: 4 additions & 7 deletions CveXplore/api/connection/api_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,22 @@ def __init__(

self.collname = collname

def find(self, filter=None):
def find(self, the_filter: dict = None):
"""
Query the api endpoint as you would do so with a pymongo Collection.
:return: Reference to the CveSearchApi
:rtype: CveSearchApi
"""

return CveSearchApi(self, filter)
return CveSearchApi(self, the_filter)

def find_one(self, filter=None):
def find_one(self, the_filter: dict = None):
"""
Query the api endpoint as you would do so with a pymongo Collection.
:return: Data or None
:rtype: object
"""

cursor = self.find(filter)
cursor = self.find(the_filter)

for result in cursor.limit(-1):
return result
Expand Down
22 changes: 13 additions & 9 deletions CveXplore/api/nvd_nist/nvd_nist_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ def __init__(
self.set_header_field("apiKey", self.api_key)
self.api_key_limit = False
else:
self.logger.warning(
"Could not find a NIST API Key in the '~/.cvexplore/.env' file; "
"you could request one at: https://nvd.nist.gov/developers/request-an-api-key"
)
self.api_key_limit = True

self.filter_rejected = False
Expand Down Expand Up @@ -84,12 +88,12 @@ def _build_url(self, resource: dict = None, data: int = 1) -> str:

def _connect(
self,
method,
method: str,
resource: dict,
session: requests.Session,
data=None,
timeout=60,
return_response_object=False,
data: dict = None,
timeout: int = 60,
return_response_object: bool = False,
):

requests.packages.urllib3.disable_warnings()
Expand Down Expand Up @@ -236,11 +240,11 @@ def get_all_data(
class ApiData(object):
def __init__(
self,
results_per_page,
start_index,
total_results,
results_per_page: int,
start_index: int,
total_results: int,
api_handle: NvdNistApi,
data_source,
data_source: int,
resource=None,
):
self.results_per_page = results_per_page
Expand Down Expand Up @@ -372,7 +376,7 @@ def process_async(self):
return results

@retry(retry_policy)
async def fetch(self, session, url):
async def fetch(self, session: aiohttp.ClientSession, url: str):
try:
async with session.get(url) as response:
self.logger.debug(f"Sending request to url: {url}")
Expand Down
24 changes: 2 additions & 22 deletions CveXplore/common/cpe_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,9 @@
from CveXplore.database.helpers.cpe_conversion import cpe_uri_to_fs, cpe_fs_to_uri


def from2to3CPE(cpe, autofill=False):
def from2to3CPE(cpe: str, autofill: bool = False) -> str:
"""
Method to transform cpe2.2 to cpe2.3 format
:param cpe: cpe2.2 string
:type cpe: str
:param autofill: Whether to cpe string should be autofilled with double quotes and hyphens
:type autofill: bool
:return: cpe2.3 formatted string
:rtype: str
"""
cpe = cpe.strip()
if not cpe.startswith("cpe:2.3:"):
Expand All @@ -28,26 +21,13 @@ def from2to3CPE(cpe, autofill=False):
return cpe


def from3to2CPE(cpe):
def from3to2CPE(cpe: str) -> str:
"""
Method to transform cpe2.3 to cpe2.2 format
:param cpe: cpe2.3 string
:type cpe: str
:return: cpe2.2 string
:rtype: str
"""
cpe = cpe.strip()
if not cpe.startswith("cpe:/"):
if not cpe.startswith("cpe:2.3:"):
return False
cpe = cpe_fs_to_uri(cpe)
return cpe


def pad(seq, target_length, padding=None):
length = len(seq)
if length > target_length:
return seq
seq.extend([padding] * (target_length - length))
return seq
9 changes: 4 additions & 5 deletions CveXplore/common/data_source_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

from CveXplore.api.connection.api_db import ApiDatabaseSource
from CveXplore.database.connection.mongo_db import MongoDBConnection
from CveXplore.objects.cvexplore_object import CveXploreObject


class DatasourceConnection(object):
class DatasourceConnection(CveXploreObject):
"""
The DatasourceConnection class handles the connection to the data source and is the base class for the database
objects and generic database functions
Expand All @@ -23,13 +24,11 @@ class DatasourceConnection(object):
else MongoDBConnection(**json.loads(os.getenv("MONGODB_CON_DETAILS")))
)

def __init__(self, collection):
def __init__(self, collection: str):
"""
Create a DatasourceConnection object
:param collection: Define the collection to connect to
:type collection: str
"""
super().__init__()
self.__collection = collection

@property
Expand Down
18 changes: 7 additions & 11 deletions CveXplore/database/connection/mongo_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,15 @@ class MongoDBConnection(object):
"""

def __init__(
self, host="mongodb://127.0.0.1:27017", port=None, database="cvedb", **kwargs
self,
host: str = "mongodb://127.0.0.1:27017",
port: int = None,
database: str = "cvedb",
**kwargs
):
"""
:param host: The `host` parameter can be a full `mongodb URI
<http://dochub.mongodb.org/core/connections>`_, in addition to
a simple hostname.
:type host: str
:param port: Port number (optional when a URI is used as host parameter)
:type port: int
:param database: Database to connect to; defaults to cvedb (Cve Search default)
:type database: str
:param kwargs: Other arguments supported by the MongoClient instantiation
:type kwargs: dict
The `host` parameter can be a full `mongodb URI <http://dochub.mongodb.org/core/connections>`_, in addition to
a simple hostname.
"""

self.client = None
Expand Down
4 changes: 2 additions & 2 deletions CveXplore/database/helpers/cpe_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@

# Convert cpe2.2 url encoded to cpe2.3 char escaped
# cpe:2.3:o:cisco:ios:12.2%281%29 to cpe:2.3:o:cisco:ios:12.2\(1\)
def unquote(cpe):
def unquote(cpe: str) -> str:
return re.compile("%([0-9a-fA-F]{2})", re.M).sub(
lambda m: "\\" + chr(int(m.group(1), 16)), cpe
)


# Convert cpe2.3 char escaped to cpe2.2 url encoded
# cpe:2.3:o:cisco:ios:12.2\(1\) to cpe:2.3:o:cisco:ios:12.2%281%29
def quote(cpe):
def quote(cpe: str) -> str:
cpe = cpe.replace("\\-", "-")
cpe = cpe.replace("\\.", ".")
return re.compile("\\\\(.)", re.M).sub(
Expand Down
64 changes: 25 additions & 39 deletions CveXplore/database/helpers/cvesearch_mongo_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,43 @@
Mongodb specific
================
"""

from pymongo.collection import Collection
from pymongo.cursor import Cursor


class CveSearchCollection(Collection):
"""
The CveSearchCollection is a custom Collection based on the pymongo Collection class which has been altered to
return a CveSearchCursor reference on the find method.
"""

def __init__(self, database, name: str, **kwargs):
"""
Get / create a custon cve-search Mongo collection.
"""

super().__init__(database, name, **kwargs)

def __repr__(self):
"""Return string representation of this class"""
return "<< CveSearchCollection:{} >>".format(self.name)

def find(self, *args, **kwargs):
"""
Query the database as you would do so with a pymongo Collection.
"""
return CveSearchCursor(self, *args, **kwargs)


class CveSearchCursor(Cursor):
"""
The CveSearchCursor is a custom cursor based on the pymongo cursor which will return database objects instead of
the raw data from the mongodb database.
"""

def __init__(self, collection, *args, **kwargs):
def __init__(self, collection: CveSearchCollection, *args, **kwargs):
"""
Create a new cve-search cursor.
:param collection: Reference to a CveSearchCollection object
:type collection: CveSearchCollection
"""
super().__init__(collection, *args, **kwargs)

Expand Down Expand Up @@ -77,37 +97,3 @@ def next(self):
def __repr__(self):
"""Return string representation of this class"""
return "<< CveSearchCursor:{} >>".format(self.collection)


class CveSearchCollection(Collection):
"""
The CveSearchCollection is a custom Collection based on the pymongo Collection class which has been altered to
return a CveSearchCursor reference on the find method.
"""

def __init__(self, database, name, **kwargs):
"""
Get / create a custon cve-search Mongo collection.
:param database: the database to get a collection from
:type database: MongoDBConnection
:param name: the name of the collection to get
:type name: str
:param kwargs: additional keyword arguments will be passed as options for the create collection command
:type kwargs: kwargs
"""

super().__init__(database, name, **kwargs)

def __repr__(self):
"""Return string representation of this class"""
return "<< CveSearchCollection:{} >>".format(self.name)

def find(self, *args, **kwargs):
"""
Query the database as you would do so with a pymongo Collection.
:return: Reference to the CveSearchCursor
:rtype: CveSearchCursor
"""
return CveSearchCursor(self, *args, **kwargs)
Loading

0 comments on commit c260668

Please sign in to comment.