-
-
Notifications
You must be signed in to change notification settings - Fork 52
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* add epss api logic * update: dependencies * improve: PinnedPages component responsiveness * add: EpssScore api/types * update: app theme store * add: VulnerabilityEpss component * precommit fixes --------- Co-authored-by: Davide Di Modica <[email protected]>
- Loading branch information
1 parent
3f6633a
commit 031ea37
Showing
21 changed files
with
1,219 additions
and
402 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
from fastapi import APIRouter | ||
from fastapi import Security | ||
from loguru import logger | ||
|
||
from app.auth.utils import AuthHandler | ||
from app.threat_intel.schema.epss import EpssThreatIntelRequest | ||
from app.threat_intel.schema.epss import EpssThreatIntelResponse | ||
from app.threat_intel.services.epss import collect_epss_score | ||
|
||
# App specific imports | ||
|
||
threat_intel_epss_router = APIRouter() | ||
|
||
|
||
@threat_intel_epss_router.post( | ||
"/epss", | ||
response_model=EpssThreatIntelResponse, | ||
description="Threat Intel EPSS Score", | ||
dependencies=[Security(AuthHandler().require_any_scope("admin", "analyst"))], | ||
) | ||
async def threat_intel_epss( | ||
request: EpssThreatIntelRequest, | ||
): | ||
""" | ||
Endpoint for SocFortress Threat Intel. | ||
This endpoint allows authorized users with 'admin' or 'analyst' scope to perform SocFortress threat intelligence lookup. | ||
Parameters: | ||
- request: SocfortressThreatIntelRequest - The request payload containing the necessary information for the lookup. | ||
- session: AsyncSession (optional) - The database session to use for the lookup. | ||
- _key_exists: bool (optional) - A dependency to ensure the API key exists. | ||
Returns: | ||
- IoCResponse: The response model containing the results of the SocFortress threat intelligence lookup. | ||
""" | ||
logger.info(f"Received request for EPSS score: {request}") | ||
return await collect_epss_score(request) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from typing import List | ||
from typing import Optional | ||
|
||
from pydantic import BaseModel | ||
from pydantic import Field | ||
|
||
|
||
class EpssThreatIntelRequest(BaseModel): | ||
cve: str = Field( | ||
..., | ||
description="The CVE to evaluate.", | ||
) | ||
|
||
|
||
class EpssData(BaseModel): | ||
cve: str | ||
epss: str | ||
percentile: str | ||
date: str | ||
|
||
|
||
class EpssApiResponse(BaseModel): | ||
status: str | ||
status_code: int | ||
version: str | ||
access_control_allow_headers: Optional[str] | ||
access: str | ||
total: int | ||
offset: int | ||
limit: int | ||
data: List[EpssData] | ||
|
||
def to_dict(self): | ||
return self.dict() | ||
|
||
|
||
class EpssThreatIntelResponse(BaseModel): | ||
data: Optional[List[EpssData]] = Field(None, description="The data for the IoC") | ||
success: bool = Field(..., description="Indicates if it was successful") | ||
message: str = Field(None, description="Message about the IoC") | ||
the_epss_model: str = Field( | ||
"https://www.first.org/epss/model", | ||
description="The EPSS model description", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import httpx | ||
from fastapi import HTTPException | ||
from loguru import logger | ||
|
||
from app.threat_intel.schema.epss import EpssData | ||
from app.threat_intel.schema.epss import EpssThreatIntelRequest | ||
from app.threat_intel.schema.epss import EpssThreatIntelResponse | ||
|
||
|
||
async def invoke_epss_api( | ||
url: str, | ||
request: EpssThreatIntelRequest, | ||
) -> dict: | ||
""" | ||
Invokes the Socfortress Process Analysis API with the provided API key, URL, and request parameters. | ||
Args: | ||
api_key (str): The API key for authentication. | ||
url (str): The URL of the Socfortress Intel URL | ||
request (SocfortressProcessNameAnalysisRequest): The request object containing the Process Name | ||
Returns: | ||
dict: The JSON response from the Process Name Analysis API. | ||
Raises: | ||
httpx.HTTPStatusError: If the API request fails with a non-successful status code. | ||
""" | ||
headers = {"content-type": "application/json"} | ||
params = {"cve": f"{request.cve}"} | ||
logger.info(f"Invoking EPSS with params: {params} and headers: {headers} and url: {url}") | ||
async with httpx.AsyncClient() as client: | ||
response = await client.get(url, headers=headers, params=params) | ||
return response.json() | ||
|
||
|
||
async def collect_epss_score( | ||
request: EpssThreatIntelRequest, | ||
) -> EpssThreatIntelResponse: | ||
""" | ||
Retrieves IoC response from Socfortress Threat Intel API. | ||
Args: | ||
request (SocfortressProcessNameAnalysisRequest): The request object containing the IoC data. | ||
session (AsyncSession): The async session object for making HTTP requests. | ||
Returns: | ||
SocfortressProcessNameAnalysisResponse: The response object containing the IoC data and success status. | ||
""" | ||
url = "https://api.first.org/data/v1/epss" | ||
response_data = await invoke_epss_api(url, request) | ||
|
||
# If status-code is not 200, raise an HTTPException | ||
if response_data.get("status-code") != 200: | ||
raise HTTPException( | ||
status_code=500, | ||
detail="Failed to retrieve EPSS score", | ||
) | ||
|
||
# Using .get() with default values | ||
data = [EpssData(**item) for item in response_data.get("data", [])] | ||
|
||
return EpssThreatIntelResponse(data=data, success=True, message="EPSS score retrieved successfully") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.