Skip to content

Commit

Permalink
Use automatic importing for collecting built-in auditors and monitors.
Browse files Browse the repository at this point in the history
Therefore, monitors are moved into separate packages per entity.

Signed-off-by: Tim Walter <[email protected]>
  • Loading branch information
twwd committed Oct 8, 2024
1 parent 4972988 commit 4f8aa9e
Show file tree
Hide file tree
Showing 24 changed files with 273 additions and 264 deletions.
52 changes: 0 additions & 52 deletions kcwarden/auditors/client_auditor.py

This file was deleted.

19 changes: 0 additions & 19 deletions kcwarden/auditors/idp_auditor.py

This file was deleted.

16 changes: 0 additions & 16 deletions kcwarden/auditors/realm_auditor.py

This file was deleted.

10 changes: 0 additions & 10 deletions kcwarden/auditors/scope_auditor.py

This file was deleted.

12 changes: 12 additions & 0 deletions kcwarden/cli.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import argparse
import logging
import os
import sys
from importlib.metadata import version

from kcwarden.subcommands import download, audit, configuration, review
from kcwarden.utils.arguments import is_dir

logger = logging.getLogger(__name__)

LOG_FORMAT = "[%(asctime)s %(levelname)-s %(name)s] %(message)s"


def add_plugin_directory_argument(parser: argparse.ArgumentParser):
parser.add_argument(
Expand Down Expand Up @@ -160,6 +166,12 @@ def add_review_parser(subparsers):


def main(args: list[str] | None = None) -> int | None:
logging.basicConfig(
level=logging.DEBUG if os.environ.get("DEBUG", "false").lower() == "true" else logging.INFO, format=LOG_FORMAT
)

logger.debug("Started")

# Parse CLI args
args_ns = get_parsers().parse_args(args)
# Execute the subcommand
Expand Down
45 changes: 32 additions & 13 deletions kcwarden/configuration/auditors.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,48 @@
from pathlib import Path
from typing import Type

from kcwarden.auditors import client_auditor, realm_auditor, idp_auditor, scope_auditor
from kcwarden.auditors import (
realm as realm_auditors,
client as client_auditors,
idp as idp_auditors,
scope as scope_auditors,
)
from kcwarden.api import Auditor
from kcwarden.monitors import client_monitor, group_monitor, service_account_monitor, protocol_mapper_monitor
from kcwarden.utils import plugins
from kcwarden.monitors import (
client as client_monitors,
group as group_monitors,
service_account as service_account_monitors,
protocol_mapper as protocol_mapper_monitors,
)
from kcwarden.utils import auditor_importing


def collect_auditors(
requested_auditors: list[str] | None = None, additional_auditors_dirs: list[Path] | None = None
) -> list[Type[Auditor]]:
"""
Collect all relevant auditors for this run.
This includes built-in auditors and monitors plus the ones provided in optional plugin directories.
If not all auditors should be applied, the results are filtered for the requested ones.
"""
auditors = []
# TODO Add new auditor modules here
auditors.extend(client_auditor.AUDITORS)
auditors.extend(realm_auditor.AUDITORS)
auditors.extend(idp_auditor.AUDITORS)
auditors.extend(scope_auditor.AUDITORS)
auditors.extend(client_monitor.AUDITORS)
auditors.extend(group_monitor.AUDITORS)
auditors.extend(service_account_monitor.AUDITORS)
auditors.extend(protocol_mapper_monitor.AUDITORS)
for package in (
client_auditors,
realm_auditors,
idp_auditors,
scope_auditors,
client_monitors,
group_monitors,
service_account_monitors,
protocol_mapper_monitors,
):
auditors.extend(sorted(auditor_importing.get_auditors_from_package(package), key=lambda a: a.get_classname()))

if additional_auditors_dirs is not None:
for directory in additional_auditors_dirs:
auditors.extend(plugins.get_auditors(directory))
auditors.extend(
sorted(auditor_importing.get_auditors_from_directory(directory), key=lambda a: a.get_classname())
)

if requested_auditors is not None:
auditors = [auditor for auditor in auditors if auditor.get_classname() in requested_auditors]
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,56 +1,9 @@
from kcwarden.custom_types.keycloak_object import ClientScope, Client, ProtocolMapper
from kcwarden.custom_types.result import Severity
from kcwarden.api import Monitor
from kcwarden.custom_types.keycloak_object import ProtocolMapper, Client, ClientScope
from kcwarden.custom_types.result import Severity
from kcwarden.database import helper


class ClientWithSensitiveScope(Monitor):
"""Checks for the use of sensitive scopes.
In some situations, specific scopes should only be available for specific clients.
This Auditor checks which OIDC clients have a specific scope in their default or
optional scopes. You can define which clients you would expect to have access to
this scope in the config file. All other clients that have the scope are reported.
If no scopes are defined in the config file, this auditor will not run.
"""

DEFAULT_SEVERITY = Severity.Medium
SHORT_DESCRIPTION = "Unexpected client uses monitored sensitive scope"
LONG_DESCRIPTION = "In the configuration, you have defined this scope to be sensitive, and defined a set of expected clients that are allowed to use it. An unexpected client has been detected that has been assigned this scope as either optional or default scope. If this is expected, please add it to the allowlist in the configuration file."
REFERENCE = ""
HAS_CUSTOM_CONFIG = True
CUSTOM_CONFIG_TEMPLATE = {
"scope": "scope name or regular expression",
}

def audit(self):
custom_config = self.get_custom_config()
for monitor_definition in custom_config:
# Load config
monitored_scope: str = monitor_definition["scope"]
allowed_clients: list[str] = monitor_definition["allowed"]
# Skip default config entry, in case it was still present
if monitored_scope == self.CUSTOM_CONFIG_TEMPLATE["scope"]: # type: ignore - confused linter
continue
for client in self._DB.get_all_clients():
# if self.is_not_ignored(client) and (monitored_scope in client.get_default_client_scopes() or monitored_scope in client.get_optional_client_scopes()):
if self.is_not_ignored(client) and (
helper.regex_matches_list_entry(monitored_scope, client.get_default_client_scopes())
or helper.regex_matches_list_entry(monitored_scope, client.get_optional_client_scopes())
):
if not helper.matches_list_of_regexes(client.get_name(), allowed_clients):
yield self.generate_finding_with_severity_from_config(
client,
monitor_definition,
additional_details={
"monitored_scope": monitored_scope,
"default_scopes": client.get_default_client_scopes(),
"optional_scopes": client.get_optional_client_scopes(),
},
)


class ClientWithSensitiveRole(Monitor):
"""Checks for the use of sensitive roles.
Expand Down Expand Up @@ -225,6 +178,3 @@ def audit(self):
"matched_scope": scope,
},
)


AUDITORS = [ClientWithSensitiveRole, ClientWithSensitiveScope]
50 changes: 50 additions & 0 deletions kcwarden/monitors/client/client_with_sensitive_scope.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from kcwarden.api import Monitor
from kcwarden.custom_types.result import Severity
from kcwarden.database import helper


class ClientWithSensitiveScope(Monitor):
"""Checks for the use of sensitive scopes.
In some situations, specific scopes should only be available for specific clients.
This Auditor checks which OIDC clients have a specific scope in their default or
optional scopes. You can define which clients you would expect to have access to
this scope in the config file. All other clients that have the scope are reported.
If no scopes are defined in the config file, this auditor will not run.
"""

DEFAULT_SEVERITY = Severity.Medium
SHORT_DESCRIPTION = "Unexpected client uses monitored sensitive scope"
LONG_DESCRIPTION = "In the configuration, you have defined this scope to be sensitive, and defined a set of expected clients that are allowed to use it. An unexpected client has been detected that has been assigned this scope as either optional or default scope. If this is expected, please add it to the allowlist in the configuration file."
REFERENCE = ""
HAS_CUSTOM_CONFIG = True
CUSTOM_CONFIG_TEMPLATE = {
"scope": "scope name or regular expression",
}

def audit(self):
custom_config = self.get_custom_config()
for monitor_definition in custom_config:
# Load config
monitored_scope: str = monitor_definition["scope"]
allowed_clients: list[str] = monitor_definition["allowed"]
# Skip default config entry, in case it was still present
if monitored_scope == self.CUSTOM_CONFIG_TEMPLATE["scope"]: # type: ignore - confused linter
continue
for client in self._DB.get_all_clients():
# if self.is_not_ignored(client) and (monitored_scope in client.get_default_client_scopes() or monitored_scope in client.get_optional_client_scopes()):
if self.is_not_ignored(client) and (
helper.regex_matches_list_entry(monitored_scope, client.get_default_client_scopes())
or helper.regex_matches_list_entry(monitored_scope, client.get_optional_client_scopes())
):
if not helper.matches_list_of_regexes(client.get_name(), allowed_clients):
yield self.generate_finding_with_severity_from_config(
client,
monitor_definition,
additional_details={
"monitored_scope": monitored_scope,
"default_scopes": client.get_default_client_scopes(),
"optional_scopes": client.get_optional_client_scopes(),
},
)
Empty file.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from kcwarden.custom_types.result import Severity
from kcwarden.api import Monitor
from kcwarden.custom_types.result import Severity
from kcwarden.database import helper


Expand Down Expand Up @@ -56,6 +56,3 @@ def audit(self):
"effective_client_roles": group.get_effective_client_roles(),
},
)


AUDITORS = [GroupWithSensitiveRole]
Empty file.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from kcwarden.custom_types.keycloak_object import Client, ProtocolMapper
from kcwarden.custom_types.result import Severity
from kcwarden.api import Monitor
from kcwarden.custom_types.keycloak_object import ProtocolMapper, Client
from kcwarden.custom_types.result import Severity
from kcwarden.database import helper


Expand Down Expand Up @@ -119,6 +119,3 @@ def audit(self):
client, mapper, "optional_scope_defined_mapper", scope_name
),
)


AUDITORS = [ProtocolMapperWithConfig]
Empty file.
53 changes: 53 additions & 0 deletions kcwarden/monitors/service_account/service_account_with_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from kcwarden.api import Monitor
from kcwarden.custom_types.result import Severity
from kcwarden.database import helper


class ServiceAccountWithGroup(Monitor):
"""Checks for service accounts assigned to specific groups.
You may have a situation where you expect all service accounts to be assigned
to specific groups (e.g., "/TecUser"), or no service accounts to be
assigned to a group (e.g., "no service accounts should be assigned to /Customer").
This monitor allows you to check for any violations of these rules.
If no groups are defined in the config file, this auditor will not run.
"""

DEFAULT_SEVERITY = Severity.Medium
SHORT_DESCRIPTION = "Service Account in unexpected group"
LONG_DESCRIPTION = "In the configuration, you have defined rules for which groups service accounts are allowed to be assigned to. This service account violates these rules. If this is a mistake, add an exclusion in the configuration."
REFERENCE = ""
HAS_CUSTOM_CONFIG = True
CUSTOM_CONFIG_TEMPLATE = {"group": "/group path or regular expression", "allow_no_group": True}

def audit(self):
custom_config = self.get_custom_config()
for monitor_definition in custom_config:
# Load config
monitored_group: str = monitor_definition["group"]
allowed_service_accounts: list[str] = monitor_definition["allowed"]
allow_no_group: bool = monitor_definition["allow_no_group"]

# Skip default config entry, in case it was still present
if monitored_group == self.CUSTOM_CONFIG_TEMPLATE["group"]: # type: ignore - confused linter
continue

for saccount in self._DB.get_all_service_accounts():
assigned_groups = saccount.get_groups()

if not allow_no_group and assigned_groups == []:
yield self.generate_finding_with_severity_from_config(
saccount,
monitor_definition,
additional_details={"monitored_group": monitored_group, "assigned_groups": assigned_groups},
)
continue

if helper.regex_matches_list_entry(monitored_group, assigned_groups):
if not helper.matches_list_of_regexes(saccount.get_username(), allowed_service_accounts):
yield self.generate_finding_with_severity_from_config(
saccount,
monitor_definition,
additional_details={"monitored_group": monitored_group, "assigned_groups": assigned_groups},
)
Loading

0 comments on commit 4f8aa9e

Please sign in to comment.