Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement topology-discovery sync worker #60

Merged
merged 4 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion topology-discovery/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# 1.0.0
- Updated frinx-python-sdk to version ^1.1
- Updated frinx-python-sdk to version ^1.1

# 1.1.0
- Implemented worker and workflow for synchronization of the devices in the specified topology.
117 changes: 117 additions & 0 deletions topology-discovery/python/frinx_worker/topology_discovery/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from enum import Enum

import requests
from frinx.common.conductor_enums import TaskResultStatus
from frinx.common.frinx_rest import INVENTORY_HEADERS
from frinx.common.frinx_rest import T0POLOGY_URL_BASE
from frinx.common.type_aliases import DictAny
from frinx.common.worker.task_def import TaskOutput
from frinx.common.worker.task_result import TaskResult
from graphql_pydantic_converter.graphql_types import QueryForm
from pydantic import BaseModel
from pydantic import Field


class TopologyWorkerOutput(TaskOutput):
"""Topology-Discovery worker output."""

query: str = Field(
description="Constructed GraphQL query.",
)
variable: DictAny | None = Field(
description="Constructed input GraphQL variables.",
default=None
)
response_body: DictAny | None = Field(
description="Response body.",
)
response_code: int = Field(
description="Response code.",
)


class ResponseStatus(str, Enum):
"""Response status."""

DATA = "data"
"""Response contains valid data."""
ERRORS = "errors"
"""Response contains some errors."""
FAILED = "failed"
"""HTTP request failed without providing list of errors in response."""


class TopologyOutput(BaseModel):
"""Parsed response from Topology-Discovery service."""

status: ResponseStatus = Field(
description="Response status."
)
code: int = Field(
description="Parsed response code."
)
data: DictAny | None = Field(
default=None,
description="Structured response data."
)


def execute_graphql_operation(
query: str,
variables: DictAny | None = None,
topology_url_base: str = T0POLOGY_URL_BASE
) -> TopologyOutput:
"""
Execute GraphQL query.

:param query: GraphQL query
:param variables: GraphQL variables in dictionary format
:param topology_url_base: Topology-Discovery service URL base
:return: TopologyOutput object
""" ""
response = requests.post(
topology_url_base,
json={
"query": query,
"variables": variables
},
headers=INVENTORY_HEADERS
)
data = response.json()
status_code = response.status_code

if data.get("errors") is not None:
return TopologyOutput(data=data["errors"], status=ResponseStatus.ERRORS, code=status_code)

if data.get("data") is not None:
return TopologyOutput(data=data["data"], status=ResponseStatus.DATA, code=status_code)

return TopologyOutput(status=ResponseStatus.FAILED, code=status_code)


def response_handler(query: QueryForm, response: TopologyOutput) -> TaskResult:
"""
Handle response from Topology-Discovery service.

:param query: GraphQL query information
:param response: parsed topology-discovery response
:return: built TaskResult object
"""
output = TopologyWorkerOutput(
response_code=response.code,
response_body=response.data,
query=query.query,
variable=query.variable
)
match response.status:
case ResponseStatus.DATA:
task_result = TaskResult(status=TaskResultStatus.COMPLETED)
task_result.status = TaskResultStatus.COMPLETED
task_result.output = output
return task_result
case _:
task_result = TaskResult(status=TaskResultStatus.FAILED)
task_result.status = TaskResultStatus.FAILED
task_result.logs = str(response)
task_result.output = output
return task_result
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from frinx.common.type_aliases import ListStr
from frinx.common.worker.service import ServiceWorkersImpl
from frinx.common.worker.task_def import TaskDefinition
from frinx.common.worker.task_def import TaskExecutionProperties
from frinx.common.worker.task_def import TaskInput
from frinx.common.worker.task_result import TaskResult
from frinx.common.worker.worker import WorkerImpl
from frinx_api.topology_discovery import SyncMutation
from frinx_api.topology_discovery import SyncResponse
from frinx_api.topology_discovery import TopologyType
from pydantic import Field

from frinx_worker.topology_discovery.utils import TopologyOutput
from frinx_worker.topology_discovery.utils import TopologyWorkerOutput
from frinx_worker.topology_discovery.utils import execute_graphql_operation
from frinx_worker.topology_discovery.utils import response_handler


class TopologyDiscoveryWorkers(ServiceWorkersImpl):
class TopologySync(WorkerImpl):
_sync_topology: SyncMutation = SyncMutation(
topologyType=TopologyType.PHYSICAL_TOPOLOGY,
devices=["dev1"],
labels=["label1"],
payload=SyncResponse(
labels=True,
loadedDevices=True,
devicesMissingInInventory=True,
devicesMissingInUniconfig=True,
)
)

class ExecutionProperties(TaskExecutionProperties):
exclude_empty_inputs: bool = True

class WorkerDefinition(TaskDefinition):
name: str = "TOPOLOGY_sync"
description: str = "Synchronize specified devices in the topology"
labels: ListStr = ["BASIC", "TOPOLOGY"]
timeout_seconds: int = 3600
response_timeout_seconds: int = 3600

class WorkerInput(TaskInput):
devices: ListStr = Field(
description="List of device identifiers that should be synchronized",
examples=[["device_id_1", "device_id_2"]],
)
topology: TopologyType = Field(
description="To be synchronized topology type",
examples=[TopologyType.ETH_TOPOLOGY],
)
labels: ListStr | None = Field(
description="List of labels that are assigned to the synchronized devices",
examples=[["label_1", "label_2"]],
)

class WorkerOutput(TopologyWorkerOutput):
...

def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]:
self._sync_topology.topology_type = worker_input.topology
self._sync_topology.devices = worker_input.devices
self._sync_topology.labels = worker_input.labels

query = self._sync_topology.render()
response: TopologyOutput = execute_graphql_operation(query=query.query, variables=query.variable)
return response_handler(query, response)
Loading
Loading