Skip to content

Commit

Permalink
Change: Reuse matches in CPE match strings API
Browse files Browse the repository at this point in the history
When parsing the CPE match string JSON from the NVD API, the matching
CPEs are cached and reused if identical ones appear in another
match string.

This can save memory when processing a large amount of CPE match strings
with duplicate matches.
  • Loading branch information
timopollmeier committed Jan 6, 2025
1 parent 2c27134 commit ff97772
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 10 deletions.
41 changes: 32 additions & 9 deletions pontos/nvd/cpe_match/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@
MAX_CPE_MATCHES_PER_PAGE = 500


def _result_iterator(data: JSON) -> Iterator[CPEMatchString]:
results: list[dict[str, Any]] = data.get("match_strings", []) # type: ignore
return (
CPEMatchString.from_dict(result["match_string"]) for result in results
)


class CPEMatchApi(NVDApi):
"""
API for querying the NIST NVD CPE match information.
Expand All @@ -62,6 +55,7 @@ def __init__(
token: Optional[str] = None,
timeout: Optional[Timeout] = DEFAULT_TIMEOUT_CONFIG,
rate_limit: bool = True,
cache_cpe_matches: bool = True,
) -> None:
"""
Create a new instance of the CPE API.
Expand All @@ -76,13 +70,22 @@ def __init__(
rolling 30 second window.
See https://nvd.nist.gov/developers/start-here#divRateLimits
Default: True.
cache_cpe_matches: If set to True (the default) the entries in the
lists of matching CPEs for each match string are cached and reused
to use less memory.
If set to False, a separate CPEMatch object is kept for each entry
to avoid possible side effects when modifying the data.
"""
super().__init__(
DEFAULT_NIST_NVD_CPE_MATCH_URL,
token=token,
timeout=timeout,
rate_limit=rate_limit,
)
if cache_cpe_matches:
self._cpe_match_cache = {}
else:
self._cpe_match_cache = None

def cpe_matches(
self,
Expand Down Expand Up @@ -157,12 +160,30 @@ def cpe_matches(
return NVDResults(
self,
params,
_result_iterator,
self._result_iterator,
request_results=request_results,
results_per_page=results_per_page,
start_index=start_index,
)

def _result_iterator(self, data: JSON) -> Iterator[CPEMatchString]:
"""
Creates an iterator of all the CPEMatchStrings in given API response JSON
Args:
data: The JSON response data to get the match strings from
Returns:
An iterator over the CPEMatchStrings
"""
results: list[dict[str, Any]] = data.get("match_strings", []) # type: ignore
return (
CPEMatchString.from_dict_with_cache(
result["match_string"], self._cpe_match_cache
)
for result in results
)

async def cpe_match(self, match_criteria_id: str) -> CPEMatchString:
"""
Returns a single CPE match for the given match criteria id.
Expand Down Expand Up @@ -201,7 +222,9 @@ async def cpe_match(self, match_criteria_id: str) -> CPEMatchString:
)

match_string = match_strings[0]
return CPEMatchString.from_dict(match_string["match_string"])
return CPEMatchString.from_dict_with_cache(
match_string["match_string"], self._cpe_match_cache
)

async def __aenter__(self) -> "CPEMatchApi":
await super().__aenter__()
Expand Down
29 changes: 28 additions & 1 deletion pontos/nvd/models/cpe_match_string.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
from typing import Any, Dict, List, Optional
from uuid import UUID

from pontos.models import Model
Expand Down Expand Up @@ -55,3 +55,30 @@ class CPEMatchString(Model):
version_start_excluding: Optional[str] = None
version_end_including: Optional[str] = None
version_end_excluding: Optional[str] = None

@classmethod
def from_dict_with_cache(
cls, data: Dict[str, Any], cpe_match_cache: Dict[str, CPEMatch] | None
):
"""
Create a CPEMatchString model from a dict, reusing
duplicate CPEMatch objects to reduce memory usage if a cache
dict is given.
Args:
data: The JSON dict to generate the model from
cpe_match_cache: A dictionary to store CPE matches or None
to not cache and reused CPE matches
"""
new_match_string = cls.from_dict(data)
if cpe_match_cache is None:
return new_match_string

for i, match in enumerate(new_match_string.matches):
if match.cpe_name_id in cpe_match_cache:
cached_match: CPEMatch = cpe_match_cache[match.cpe_name_id]
if cached_match.cpe_name == match.cpe_name:
new_match_string.matches[i] = cached_match
else:
cpe_match_cache[match.cpe_name_id] = match
return new_match_string

0 comments on commit ff97772

Please sign in to comment.