Skip to content

Commit

Permalink
use settings class inherited from baseSettings
Browse files Browse the repository at this point in the history
  • Loading branch information
elay committed Jun 12, 2024
1 parent f3d56e1 commit f3cd2ba
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 39 deletions.
6 changes: 3 additions & 3 deletions pcfuncs/ipban/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from azure.identity import DefaultAzureCredential
from azure.monitor.query import LogsQueryClient

from .constants import BANNED_IP_TABLE, STORAGE_ACCOUNT_URL
from .config import settings
from .models import UpdateBannedIPTask


Expand All @@ -18,10 +18,10 @@ def main(mytimer: func.TimerRequest) -> None:
credential: DefaultAzureCredential = DefaultAzureCredential()
logs_query_client: LogsQueryClient = LogsQueryClient(credential)
table_service_client: TableServiceClient = TableServiceClient(
endpoint=STORAGE_ACCOUNT_URL, credential=credential
endpoint=settings.storage_account_url, credential=credential
)
table_client: TableClient = table_service_client.create_table_if_not_exists(
BANNED_IP_TABLE
settings.banned_ip_table
)
task: UpdateBannedIPTask = UpdateBannedIPTask(logs_query_client, table_client)
task.run()
19 changes: 19 additions & 0 deletions pcfuncs/ipban/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# config.py
from pydantic import BaseSettings


class Settings(BaseSettings):
# Constants related to Azure Table Storage
storage_account_url: str = "https://pctapisstagingsa.table.core.windows.net/"
banned_ip_table: str = "blobstoragebannedip"

# Log Analytics Workspace: pc-api-loganalytics
log_analytics_workspace_id: str = "78d48390-b6bb-49a9-b7fd-a86f6522e9c4"

# Time and threshold settings
time_window_in_hours: int = 24
threshold_read_count_in_gb: int = 5120


# Create a global settings instance
settings = Settings()
10 changes: 0 additions & 10 deletions pcfuncs/ipban/constants.py

This file was deleted.

16 changes: 6 additions & 10 deletions pcfuncs/ipban/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
from azure.monitor.query import LogsQueryClient
from azure.monitor.query._models import LogsTableRow

from .constants import (
LOG_ANALYTICS_WORKSPACE_ID,
THRESHOLD_READ_COUNT_IN_GB,
TIME_WINDOW_IN_HOURS,
)
from .config import settings


class UpdateBannedIPTask:
Expand All @@ -29,14 +25,14 @@ def run(self) -> List[LogsTableRow]:
def get_blob_logs_query_result(self) -> List[LogsTableRow]:
query: str = f"""
StorageBlobLogs
| where TimeGenerated > ago({TIME_WINDOW_IN_HOURS}h)
| where TimeGenerated > ago({settings.time_window_in_hours}h)
| extend IpAddress = tostring(split(CallerIpAddress, ":")[0])
| summarize readcount = sum(ResponseBodySize) / (1024 * 1024 * 1024)
by IpAddress
| where readcount > {THRESHOLD_READ_COUNT_IN_GB}
| where readcount > {settings.threshold_read_count_in_gb}
"""
response: Any = self.log_query_client.query_workspace(
LOG_ANALYTICS_WORKSPACE_ID, query, timespan=None
settings.log_analytics_workspace_id, query, timespan=None
)
return response.tables[0].rows

Expand All @@ -53,8 +49,8 @@ def update_banned_ips(self, query_result: List[LogsTableRow]) -> None:
"PartitionKey": ip_address,
"RowKey": ip_address,
"ReadCount": read_count,
"Threshold": THRESHOLD_READ_COUNT_IN_GB,
"TimeWindow": TIME_WINDOW_IN_HOURS,
"Threshold": settings.threshold_read_count_in_gb,
"TimeWindow": settings.time_window_in_hours,
}

if ip_address in existing_ips:
Expand Down
28 changes: 12 additions & 16 deletions pcfuncs/tests/ipban/test_ipban.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@
from azure.identity import DefaultAzureCredential
from azure.monitor.query import LogsQueryClient
from azure.monitor.query._models import LogsTableRow
from ipban.constants import (
STORAGE_ACCOUNT_URL,
THRESHOLD_READ_COUNT_IN_GB,
TIME_WINDOW_IN_HOURS,
)
from ipban.config import settings
from ipban.models import UpdateBannedIPTask
from pytest_mock import MockerFixture

Expand All @@ -26,22 +22,22 @@ def populate_banned_ip_table(table_client: TableClient) -> List[Dict[str, Any]]:
"PartitionKey": "192.168.1.1",
"RowKey": "192.168.1.1",
"ReadCount": 647,
"Threshold": THRESHOLD_READ_COUNT_IN_GB,
"TimeWindow": TIME_WINDOW_IN_HOURS,
"Threshold": settings.threshold_read_count_in_gb,
"TimeWindow": settings.time_window_in_hours,
},
{
"PartitionKey": "192.168.1.2",
"RowKey": "192.168.1.2",
"ReadCount": 214,
"Threshold": THRESHOLD_READ_COUNT_IN_GB,
"TimeWindow": TIME_WINDOW_IN_HOURS,
"Threshold": settings.threshold_read_count_in_gb,
"TimeWindow": settings.time_window_in_hours,
},
{
"PartitionKey": "192.168.1.3",
"RowKey": "192.168.1.3",
"ReadCount": 550,
"Threshold": THRESHOLD_READ_COUNT_IN_GB,
"TimeWindow": TIME_WINDOW_IN_HOURS,
"Threshold": settings.threshold_read_count_in_gb,
"TimeWindow": settings.time_window_in_hours,
},
]
for entity in entities:
Expand Down Expand Up @@ -96,7 +92,7 @@ def integration_clients(
credential: DefaultAzureCredential = DefaultAzureCredential()
logs_query_client: LogsQueryClient = LogsQueryClient(credential)
table_service_client: TableServiceClient = TableServiceClient(
endpoint=STORAGE_ACCOUNT_URL, credential=credential
endpoint=settings.storage_account_url, credential=credential
)
table_client: TableClient = table_service_client.create_table_if_not_exists(
TEST_BANNED_IP_TABLE
Expand All @@ -122,8 +118,8 @@ def test_update_banned_ip_integration(
for ip, expected_read_count in logs_query_result:
entity: TableEntity = table_client.get_entity(ip, ip)
assert entity["ReadCount"] == expected_read_count
assert entity["Threshold"] == THRESHOLD_READ_COUNT_IN_GB
assert entity["TimeWindow"] == TIME_WINDOW_IN_HOURS
assert entity["Threshold"] == settings.threshold_read_count_in_gb
assert entity["TimeWindow"] == settings.time_window_in_hours


def test_update_banned_ip(mock_clients: Tuple[MagicMock, TableClient]) -> None:
Expand All @@ -136,5 +132,5 @@ def test_update_banned_ip(mock_clients: Tuple[MagicMock, TableClient]) -> None:
for ip, expected_read_count in MOCK_LOGS_QUERY_RESULT:
entity = table_client.get_entity(ip, ip)
assert entity["ReadCount"] == expected_read_count
assert entity["Threshold"] == THRESHOLD_READ_COUNT_IN_GB
assert entity["TimeWindow"] == TIME_WINDOW_IN_HOURS
assert entity["Threshold"] == settings.threshold_read_count_in_gb
assert entity["TimeWindow"] == settings.time_window_in_hours

0 comments on commit f3cd2ba

Please sign in to comment.