Skip to content

Commit

Permalink
Use ops[testing] rather than ops-scenario<7. (#97)
Browse files Browse the repository at this point in the history
Co-authored-by: PietroPasotti <[email protected]>
  • Loading branch information
tonyandrewmeyer and PietroPasotti authored Nov 14, 2024
1 parent 6ab947b commit 14c27b4
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 272 deletions.
75 changes: 38 additions & 37 deletions tests/test_coordinated_workers/test_coordinator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import dataclasses
import json

import ops
import pytest
from ops import Framework
from scenario import Container, Context, Relation, State
from ops import testing

from src.cosl.coordinated_workers.coordinator import (
ClusterRolesConfig,
Expand All @@ -16,15 +16,15 @@
@pytest.fixture
def coordinator_state():
requires_relations = {
endpoint: Relation(endpoint=endpoint, interface=interface["interface"])
endpoint: testing.Relation(endpoint=endpoint, interface=interface["interface"])
for endpoint, interface in {
"my-certificates": {"interface": "certificates"},
"my-logging": {"interface": "loki_push_api"},
"my-charm-tracing": {"interface": "tracing"},
"my-workload-tracing": {"interface": "tracing"},
}.items()
}
requires_relations["my-s3"] = Relation(
requires_relations["my-s3"] = testing.Relation(
"my-s3",
interface="s3",
remote_app_data={
Expand All @@ -34,35 +34,35 @@ def coordinator_state():
"secret-key": "my-secret-key",
},
)
requires_relations["cluster_worker0"] = Relation(
requires_relations["cluster_worker0"] = testing.Relation(
"my-cluster",
remote_app_name="worker0",
remote_app_data=ClusterRequirerAppData(role="read").dump(),
)
requires_relations["cluster_worker1"] = Relation(
requires_relations["cluster_worker1"] = testing.Relation(
"my-cluster",
remote_app_name="worker1",
remote_app_data=ClusterRequirerAppData(role="write").dump(),
)
requires_relations["cluster_worker2"] = Relation(
requires_relations["cluster_worker2"] = testing.Relation(
"my-cluster",
remote_app_name="worker2",
remote_app_data=ClusterRequirerAppData(role="backend").dump(),
)

provides_relations = {
endpoint: Relation(endpoint=endpoint, interface=interface["interface"])
endpoint: testing.Relation(endpoint=endpoint, interface=interface["interface"])
for endpoint, interface in {
"my-dashboards": {"interface": "grafana_dashboard"},
"my-metrics": {"interface": "prometheus_scrape"},
}.items()
}

return State(
containers=[
Container("nginx", can_connect=True),
Container("nginx-prometheus-exporter", can_connect=True),
],
return testing.State(
containers={
testing.Container("nginx", can_connect=True),
testing.Container("nginx-prometheus-exporter", can_connect=True),
},
relations=list(requires_relations.values()) + list(provides_relations.values()),
)

Expand Down Expand Up @@ -90,7 +90,7 @@ class MyCoordinator(ops.CharmBase):
},
}

def __init__(self, framework: Framework):
def __init__(self, framework: ops.Framework):
super().__init__(framework)
# Note: Here it is a good idea not to use context mgr because it is "ops aware"
self.coordinator = Coordinator(
Expand Down Expand Up @@ -133,48 +133,48 @@ def __init__(self, framework: Framework):


def test_worker_roles_subset_of_minimal_deployment(
coordinator_state: State, coordinator_charm: ops.CharmBase
coordinator_state: testing.State, coordinator_charm: ops.CharmBase
):
# Test that the combination of worker roles is a subset of the minimal deployment roles

# GIVEN a coordinator_charm
ctx = Context(coordinator_charm, meta=coordinator_charm.META)
ctx = testing.Context(coordinator_charm, meta=coordinator_charm.META)

# AND a coordinator_state defining relations to worker charms with incomplete distributed roles
missing_backend_worker_relation = [
missing_backend_worker_relation = {
relation
for relation in coordinator_state.relations
if relation.remote_app_name != "worker2"
]
}

# WHEN we process any event
with ctx.manager(
"update-status",
state=coordinator_state.replace(relations=missing_backend_worker_relation),
with ctx(
ctx.on.update_status(),
state=dataclasses.replace(coordinator_state, relations=missing_backend_worker_relation),
) as mgr:
charm: coordinator_charm = mgr.charm

# THEN the deployment is coherent
# THEN the deployment is not coherent
assert not charm.coordinator.is_coherent


def test_without_s3_integration_raises_error(
coordinator_state: State, coordinator_charm: ops.CharmBase
coordinator_state: testing.State, coordinator_charm: ops.CharmBase
):
# Test that a charm without an s3 integration raises S3NotFoundError

# GIVEN a coordinator charm without an s3 integration
ctx = Context(coordinator_charm, meta=coordinator_charm.META)
relations_without_s3 = [
ctx = testing.Context(coordinator_charm, meta=coordinator_charm.META)
relations_without_s3 = {
relation for relation in coordinator_state.relations if relation.endpoint != "my-s3"
]
}

# WHEN we process any event
with ctx.manager(
"update-status",
state=coordinator_state.replace(relations=relations_without_s3),
with ctx(
ctx.on.update_status(),
state=dataclasses.replace(coordinator_state, relations=relations_without_s3),
) as mgr:
# THEN the _s3_config method raises and S3NotFoundError
# THEN the _s3_config method raises an S3NotFoundError
with pytest.raises(S3NotFoundError):
mgr.charm.coordinator._s3_config

Expand All @@ -193,7 +193,7 @@ def test_without_s3_integration_raises_error(
),
)
def test_s3_integration(
coordinator_state: State,
coordinator_state: testing.State,
coordinator_charm: ops.CharmBase,
region,
endpoint,
Expand All @@ -206,7 +206,7 @@ def test_s3_integration(
# Test that a charm with a s3 integration gives the expected _s3_config

# GIVEN a coordinator charm with a s3 integration
ctx = Context(coordinator_charm, meta=coordinator_charm.META)
ctx = testing.Context(coordinator_charm, meta=coordinator_charm.META)
s3_relation = coordinator_state.get_relations("my-s3")[0]
relations_except_s3 = [
relation for relation in coordinator_state.relations if relation.endpoint != "my-s3"
Expand All @@ -224,13 +224,14 @@ def test_s3_integration(
}

# WHEN we process any event
with ctx.manager(
"update-status",
state=coordinator_state.replace(
relations=relations_except_s3 + [s3_relation.replace(remote_app_data=s3_app_data)]
with ctx(
ctx.on.update_status(),
state=dataclasses.replace(
coordinator_state,
relations=relations_except_s3
+ [dataclasses.replace(s3_relation, remote_app_data=s3_app_data)],
),
) as mgr:

# THEN the s3_connection_info method returns the expected data structure
coordinator: Coordinator = mgr.charm.coordinator
assert coordinator.s3_connection_info.region == region
Expand Down
71 changes: 38 additions & 33 deletions tests/test_coordinated_workers/test_coordinator_status.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import dataclasses
from unittest.mock import MagicMock, PropertyMock, patch

import httpx
import ops
import pytest
import tenacity
from lightkube import ApiError
from ops import ActiveStatus, BlockedStatus, CharmBase, Framework, WaitingStatus
from scenario import Container, Context, Relation, State
from ops import testing

from cosl.coordinated_workers.coordinator import ClusterRolesConfig, Coordinator
from cosl.coordinated_workers.interface import ClusterProviderAppData, ClusterRequirerAppData
Expand All @@ -19,9 +20,8 @@
)


class MyCoordCharm(CharmBase):

def __init__(self, framework: Framework):
class MyCoordCharm(ops.CharmBase):
def __init__(self, framework: ops.Framework):
super().__init__(framework)

self.coordinator = Coordinator(
Expand Down Expand Up @@ -54,7 +54,7 @@ def coord_charm():

@pytest.fixture
def ctx(coord_charm):
return Context(
return testing.Context(
coord_charm,
meta={
"name": "lilith",
Expand All @@ -80,7 +80,7 @@ def ctx(coord_charm):

@pytest.fixture()
def s3():
return Relation(
return testing.Relation(
"s3",
remote_app_data={
"access-key": "key",
Expand All @@ -98,33 +98,40 @@ def worker():
ClusterProviderAppData(worker_config="some: yaml").dump(app_data)
remote_app_data = {}
ClusterRequirerAppData(role="role").dump(remote_app_data)
return Relation("cluster", local_app_data=app_data, remote_app_data=remote_app_data)
return testing.Relation("cluster", local_app_data=app_data, remote_app_data=remote_app_data)


@pytest.fixture()
def base_state(s3, worker):

return State(
return testing.State(
leader=True,
containers=[Container("nginx"), Container("nginx-prometheus-exporter")],
relations=[worker, s3],
containers={testing.Container("nginx"), testing.Container("nginx-prometheus-exporter")},
relations={worker, s3},
)


def set_containers(state, nginx_can_connect=False, exporter_can_connect=False):
containers = {
testing.Container("nginx", can_connect=nginx_can_connect),
testing.Container("nginx-prometheus-exporter", can_connect=exporter_can_connect),
}
return dataclasses.replace(state, containers=containers)


@patch(
"charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.apply",
MagicMock(return_value=None),
)
def test_status_check_no_workers(ctx, base_state, s3, caplog):
# GIVEN the container cannot connect
state = base_state.with_can_connect("nginx", True)
state = state.replace(relations=[s3])
state = set_containers(base_state, True, False)
state = dataclasses.replace(state, relations={s3})

# WHEN we run any event
state_out = ctx.run("config_changed", state)
state_out = ctx.run(ctx.on.config_changed(), state)

# THEN the charm sets blocked
assert state_out.unit_status == BlockedStatus("[consistency] Missing any worker relation.")
assert state_out.unit_status == ops.BlockedStatus("[consistency] Missing any worker relation.")


@patch(
Expand All @@ -133,29 +140,28 @@ def test_status_check_no_workers(ctx, base_state, s3, caplog):
)
def test_status_check_no_s3(ctx, base_state, worker, caplog):
# GIVEN the container cannot connect
state = base_state.with_can_connect("nginx", True)
state = state.replace(relations=[worker])
state = set_containers(base_state, True, False)
state = dataclasses.replace(base_state, relations={worker})

# WHEN we run any event
state_out = ctx.run("config_changed", state)
state_out = ctx.run(ctx.on.config_changed(), state)

# THEN the charm sets blocked
assert state_out.unit_status == BlockedStatus("[s3] Missing S3 integration.")
assert state_out.unit_status == ops.BlockedStatus("[s3] Missing S3 integration.")


@patch(
"charms.observability_libs.v0.kubernetes_compute_resources_patch.KubernetesComputeResourcesPatch.get_status",
MagicMock(return_value=(BlockedStatus(""))),
MagicMock(return_value=(ops.BlockedStatus(""))),
)
def test_status_check_k8s_patch_failed(ctx, base_state, caplog):
# GIVEN the container can connect
state = base_state.with_can_connect("nginx", True)
state = base_state.with_can_connect("nginx-prometheus-exporter", True)
state = set_containers(base_state, True, True)

# WHEN we run any event
state_out = ctx.run("update_status", state)
state_out = ctx.run(ctx.on.update_status(), state)

assert state_out.unit_status == BlockedStatus("")
assert state_out.unit_status == ops.BlockedStatus("")


@patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher")
Expand All @@ -167,8 +173,7 @@ def test_status_check_k8s_patch_success_after_retries(
resource_patcher_mock, ctx, base_state, caplog
):
# GIVEN the container can connect
state = base_state.with_can_connect("nginx", True)
state = base_state.with_can_connect("nginx-prometheus-exporter", True)
state = set_containers(base_state, True, True)

# Retry on that error
response = httpx.Response(
Expand All @@ -180,14 +185,14 @@ def test_status_check_k8s_patch_success_after_retries(
# on collect-unit-status, the request patches are not yet reflected
with patch(
"cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch.get_status",
MagicMock(return_value=WaitingStatus("waiting")),
MagicMock(return_value=ops.WaitingStatus("waiting")),
):
state_intermediate = ctx.run("config_changed", state)
assert state_intermediate.unit_status == WaitingStatus("waiting")
state_intermediate = ctx.run(ctx.on.config_changed(), state)
assert state_intermediate.unit_status == ops.WaitingStatus("waiting")

with patch(
"cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch.get_status",
MagicMock(return_value=ActiveStatus("")),
MagicMock(return_value=ops.ActiveStatus("")),
):
state_out = ctx.run("update_status", state_intermediate)
assert state_out.unit_status == ActiveStatus("Degraded.")
state_out = ctx.run(ctx.on.update_status(), state_intermediate)
assert state_out.unit_status == ops.ActiveStatus("Degraded.")
Loading

0 comments on commit 14c27b4

Please sign in to comment.