Skip to content

Commit

Permalink
Fix scan profile db event issue by adding an explicit reference field…
Browse files Browse the repository at this point in the history
… (1.9) (#1093)

Co-authored-by: Donny Peeters <[email protected]>
Co-authored-by: ammar92 <[email protected]>
Co-authored-by: Patrick <[email protected]>
  • Loading branch information
4 people authored Jun 1, 2023
1 parent 8f66c88 commit 50a379a
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 133 deletions.
3 changes: 1 addition & 2 deletions octopoes/octopoes/core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ def save_origin(self, origin: Origin, oois: List[OOI], valid_time: datetime) ->

def _run_inference(self, origin: Origin, valid_time: datetime) -> None:
bit_definition = get_bit_definitions()[origin.method]

is_disabled = bit_definition.id in settings.bits_disabled or (
not bit_definition.default_enabled and bit_definition.id not in settings.bits_enabled
)
Expand Down Expand Up @@ -423,7 +422,7 @@ def _on_delete_origin_parameter(self, event: OriginParameterDBEvent) -> None:
return

def _run_inferences(self, event: ScanProfileDBEvent) -> None:
inference_origins = self.origin_repository.list_by_source(event.new_data.reference, valid_time=event.valid_time)
inference_origins = self.origin_repository.list_by_source(event.reference, valid_time=event.valid_time)
inference_origins = [o for o in inference_origins if o.origin_type == OriginType.INFERENCE]
for inference_origin in inference_origins:
self._run_inference(inference_origin, event.valid_time)
Expand Down
3 changes: 2 additions & 1 deletion octopoes/octopoes/events/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ def primary_key(self) -> str:

class ScanProfileDBEvent(DBEvent):
entity_type: Literal["scan_profile"] = "scan_profile"
reference: Reference
old_data: Optional[ScanProfile]
new_data: Optional[ScanProfile]

@property
def primary_key(self) -> Reference:
return self.new_data.reference if self.new_data else self.old_data.reference
return self.reference


EVENT_TYPE = Union[OOIDBEvent, OriginDBEvent, OriginParameterDBEvent, ScanProfileDBEvent]
94 changes: 46 additions & 48 deletions octopoes/octopoes/events/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,60 +54,58 @@ def publish(self, event: DBEvent) -> None:
event.client,
)

if isinstance(event, ScanProfileDBEvent):
incremented = (event.operation_type == OperationType.CREATE and event.new_data.level > 0) or (
event.operation_type == OperationType.UPDATE and event.new_data.level > event.old_data.level
)
if incremented:
ooi = json.dumps(
{
"primary_key": event.new_data.reference,
"object_type": event.new_data.reference.class_,
"scan_profile": event.new_data.dict(),
}
)

self.channel.basic_publish(
"",
f"{event.client}__scan_profile_increments",
ooi.encode(),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent,
),
)

logger.info(
"Published scan_profile_increment [primary_key=%s] [level=%s]",
format_id_short(event.primary_key),
event.new_data.level,
)

# publish mutations
mutation = ScanProfileMutation(
operation=event.operation_type,
primary_key=event.primary_key,
)
if not isinstance(event, ScanProfileDBEvent):
return

if event.operation_type != OperationType.DELETE:
mutation.value = AbstractOOI(
primary_key=event.new_data.reference,
object_type=event.new_data.reference.class_,
scan_profile=event.new_data,
)
incremented = (event.operation_type == OperationType.CREATE and event.new_data.level > 0) or (
event.operation_type == OperationType.UPDATE
and event.old_data
and event.new_data.level > event.old_data.level
)

if incremented:
ooi = json.dumps(
{
"primary_key": event.reference,
"object_type": event.reference.class_,
"scan_profile": event.new_data.dict(),
}
)

self.channel.basic_publish(
"",
f"{event.client}__scan_profile_mutations",
mutation.json().encode(),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent,
),
f"{event.client}__scan_profile_increments",
ooi.encode(),
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)

level = mutation.value.scan_profile.level if mutation.value != OperationType.DELETE else None
logger.info(
"Published scan profile mutation [operation_type=%s] [primary_key=%s] [level=%s]",
mutation.operation,
"Published scan_profile_increment [primary_key=%s] [level=%s]",
format_id_short(event.primary_key),
level,
event.new_data.level,
)

# publish mutations
mutation = ScanProfileMutation(operation=event.operation_type, primary_key=event.primary_key)

if event.operation_type != OperationType.DELETE:
mutation.value = AbstractOOI(
primary_key=event.new_data.reference,
object_type=event.new_data.reference.class_,
scan_profile=event.new_data,
)

self.channel.basic_publish(
"",
f"{event.client}__scan_profile_mutations",
mutation.json().encode(),
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)

level = mutation.value.scan_profile.level if mutation.value is not None else None
logger.info(
"Published scan profile mutation [operation_type=%s] [primary_key=%s] [level=%s]",
mutation.operation,
format_id_short(event.primary_key),
level,
)
2 changes: 2 additions & 0 deletions octopoes/octopoes/repositories/scan_profile_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def save(
event = ScanProfileDBEvent(
operation_type=OperationType.CREATE if old_scan_profile is None else OperationType.UPDATE,
valid_time=valid_time,
reference=new_scan_profile.reference,
old_data=old_scan_profile,
new_data=new_scan_profile,
)
Expand All @@ -118,6 +119,7 @@ def delete(self, scan_profile: ScanProfileBase, valid_time: datetime) -> None:

event = ScanProfileDBEvent(
operation_type=OperationType.DELETE,
reference=scan_profile.reference,
valid_time=valid_time,
old_data=scan_profile,
)
Expand Down
24 changes: 23 additions & 1 deletion octopoes/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from unittest.mock import Mock

import pytest
from bits.runner import BitRunner
from requests.adapters import HTTPAdapter, Retry

from octopoes.api.api import app
from octopoes.api.router import settings
from octopoes.config.settings import Settings, XTDBType
from octopoes.core.app import get_xtdb_client
from octopoes.models import OOI, EmptyScanProfile, Reference, ScanProfileBase
from octopoes.core.service import OctopoesService
from octopoes.models import OOI, DeclaredScanProfile, EmptyScanProfile, Reference, ScanProfileBase
from octopoes.models.path import Direction, Path
from octopoes.models.types import DNSZone, Hostname, IPAddressV4, Network, ResolvedHostname
from octopoes.repositories.ooi_repository import OOIRepository
Expand Down Expand Up @@ -146,6 +148,16 @@ def resolved_hostname(hostname, ipaddressv4, ooi_repository, scan_profile_reposi
)


@pytest.fixture
def empty_scan_profile():
return EmptyScanProfile(reference="test_reference")


@pytest.fixture
def declared_scan_profile():
return DeclaredScanProfile(reference="test_reference", level=2)


@pytest.fixture
def xtdbtype_multinode():
def get_settings_override():
Expand All @@ -161,6 +173,16 @@ def app_settings():
return Settings(xtdb_type=XTDBType.XTDB_MULTINODE)


@pytest.fixture
def octopoes_service() -> OctopoesService:
return OctopoesService(Mock(), Mock(), Mock(), Mock())


@pytest.fixture
def bit_runner(mocker) -> BitRunner:
return mocker.patch("octopoes.core.service.BitRunner")


@pytest.fixture
def xtdb_http_client(app_settings: Settings) -> XTDBHTTPClient:
client = get_xtdb_client(app_settings.xtdb_uri, "test", app_settings.xtdb_type)
Expand Down
173 changes: 173 additions & 0 deletions octopoes/tests/test_event_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import uuid
from datetime import datetime

import pika

from octopoes.events.events import OOIDBEvent, OperationType, ScanProfileDBEvent
from octopoes.events.manager import EventManager


def test_event_manager_create_ooi(mocker, network):
celery_mock = mocker.Mock()
channel_mock = mocker.Mock()

mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
manager = EventManager("test", celery_mock, "queue", channel_mock)
event = OOIDBEvent(operation_type=OperationType.CREATE, valid_time=datetime(2023, 1, 1), new_data=network)
manager.publish(event)

celery_mock.send_task.assert_called_once_with(
"octopoes.tasks.tasks.handle_event",
(
{
"entity_type": "ooi",
"operation_type": "create",
"valid_time": "2023-01-01T00:00:00",
"client": "test",
"old_data": None,
"new_data": {
"object_type": "Network",
"scan_profile": None,
"primary_key": "Network|internet",
"name": "internet",
},
},
),
queue="queue",
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
)

channel_mock.basic_publish.assert_not_called()


def test_event_manager_create_empty_scan_profile(mocker, empty_scan_profile):
celery_mock = mocker.Mock()
channel_mock = mocker.Mock()

mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
manager = EventManager("test", celery_mock, "queue", channel_mock)
event = ScanProfileDBEvent(
operation_type=OperationType.CREATE,
valid_time=datetime(2023, 1, 1),
new_data=empty_scan_profile,
reference="test_reference",
)
manager.publish(event)

celery_mock.send_task.assert_called_once_with(
"octopoes.tasks.tasks.handle_event",
(
{
"entity_type": "scan_profile",
"operation_type": "create",
"valid_time": "2023-01-01T00:00:00",
"client": "test",
"old_data": None,
"new_data": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0},
"reference": "test_reference",
},
),
queue="queue",
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
)

channel_mock.basic_publish.assert_called_once_with(
"",
"test__scan_profile_mutations",
b'{"operation": "create", "primary_key": "test_reference", '
b'"value": {"primary_key": "test_reference", '
b'"object_type": "test_reference", '
b'"scan_profile": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0}}}',
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)


def test_event_manager_create_declared_scan_profile(mocker, declared_scan_profile):
celery_mock = mocker.Mock()
channel_mock = mocker.Mock()

mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
manager = EventManager("test", celery_mock, "queue", channel_mock)
event = ScanProfileDBEvent(
operation_type=OperationType.CREATE,
valid_time=datetime(2023, 1, 1),
new_data=declared_scan_profile,
reference="test_reference",
)
manager.publish(event)

celery_mock.send_task.assert_called_once_with(
"octopoes.tasks.tasks.handle_event",
(
{
"entity_type": "scan_profile",
"operation_type": "create",
"valid_time": "2023-01-01T00:00:00",
"client": "test",
"old_data": None,
"new_data": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2},
"reference": "test_reference",
},
),
queue="queue",
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
)

assert channel_mock.basic_publish.call_count == 2
channel_mock.basic_publish.asset_has_calls(
mocker.call(
"",
"test__scan_profile_increments",
b'{"primary_key": "test_reference", "object_type": "test_reference",'
b'"scan_profile": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2}}',
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
),
mocker.call(
"",
"test__scan_profile_mutations",
b'{"operation": "create", "primary_key": "test_reference", '
b'"value": {"primary_key": "test_reference", '
b'"object_type": "test_reference", '
b'"scan_profile": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2}}}',
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
),
)


def test_event_manager_delete_empty_scan_profile(mocker, empty_scan_profile):
celery_mock = mocker.Mock()
channel_mock = mocker.Mock()

mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
manager = EventManager("test", celery_mock, "queue", channel_mock)
event = ScanProfileDBEvent(
operation_type=OperationType.DELETE,
valid_time=datetime(2023, 1, 1),
old_data=empty_scan_profile,
reference="test_reference",
)
manager.publish(event)

celery_mock.send_task.assert_called_once_with(
"octopoes.tasks.tasks.handle_event",
(
{
"entity_type": "scan_profile",
"operation_type": "delete",
"valid_time": "2023-01-01T00:00:00",
"client": "test",
"old_data": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0},
"new_data": None,
"reference": "test_reference",
},
),
queue="queue",
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
)

channel_mock.basic_publish.assert_called_once_with(
"",
"test__scan_profile_mutations",
b'{"operation": "delete", "primary_key": "test_reference", "value": null}',
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)
Loading

0 comments on commit 50a379a

Please sign in to comment.