Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

11 implementation of multiple fiware service support #31

Merged
merged 17 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 80 additions & 48 deletions backend/api/main.py

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions backend/gateway/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@
tls_context = None
REDIS_URL = settings.REDIS_URL
orion_url = settings.ORION_URL
service = settings.FIWARE_SERVICE
service_path = settings.FIWARE_SERVICEPATH

host = settings.POSTGRES_HOST
user = settings.POSTGRES_USER
password = settings.POSTGRES_PASSWORD
Expand Down Expand Up @@ -169,6 +166,12 @@ async def process_mqtt_message(
json.loads(message.decode("utf-8"))
)
if len(parsed_result) > 0:
# use the fiware_service from the datapoint if it exists,
# otherwise use the default one
if datapoint["fiware_service"]:
service = datapoint["fiware_service"]
else:
service = settings.FIWARE_SERVICE
value = parsed_result[0].value
payload = {
datapoint["attribute_name"]: value
Expand All @@ -178,10 +181,9 @@ async def process_mqtt_message(
await session.patch(
url=f"{orion_url}/v2/entities/{datapoint['entity_id']}/attrs?type={datapoint['entity_type']}&options=keyValues",
json=payload,
# TODO support other headers
headers={
"fiware-service": service,
"fiware-servicepath": service_path,
"fiware-servicepath": settings.FIWARE_SERVICEPATH,
},
)
logging.info(f"Sent {payload} to Orion Context Broker")
Expand Down Expand Up @@ -265,7 +267,7 @@ async def get_datapoints_by_topic(self, topic: str):
"""
async with self.conn.transaction():
records = await self.conn.fetch(
"SELECT object_id, jsonpath, entity_id, entity_type, attribute_name FROM datapoints WHERE topic = $1",
"SELECT object_id, jsonpath, entity_id, entity_type, attribute_name, fiware_service FROM datapoints WHERE topic = $1",
topic,
)
return [dict(record) for record in records]
Expand Down
25 changes: 23 additions & 2 deletions frontend/src/components/Form.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,20 @@
entity_id: null,
entity_type: null,
attribute_name: null,
connected: false
connected: false,
fiware_service: '',
};

let isMultiTenancy = false; // New state for multi-tenancy checkbox

// Reactive statement that updates whenever formState changes
$: newDatapoint.set(formState as Datapoint);

const handleSubmit = async (event: Event) => {
event.preventDefault();
if (!isMultiTenancy) {
formState.fiware_service = ''; // Clear fiware_service if multi-tenancy is not enabled
}
try {
await addData($newDatapoint);
await refreshData(); // Refresh the data after adding a new datapoint
Expand All @@ -33,8 +39,10 @@
entity_id: null,
entity_type: null,
attribute_name: null,
connected: false
connected: false,
fiware_service: '',
};
isMultiTenancy = false; // Reset the multi-tenancy checkbox
} catch (e) {
console.error('An error occurred while adding the data:', e);
}
Expand All @@ -53,6 +61,18 @@
<input type="text" id="topic" bind:value={formState.topic} required />
<label for="description">Description</label>
<input type="text" id="description" bind:value={formState.description} />

<!-- Multi-Tenancy Checkbox -->
<div class="multi-tenancy">
<label for="multiTenancy">Multi Tenancy</label>
<input type="checkbox" id="multiTenancy" bind:checked={isMultiTenancy} />
</div>

{#if isMultiTenancy}
<label for="fiware_service">FIWARE Service</label>
<input type="text" id="fiware_service" bind:value={formState.fiware_service} required />
{/if}

<div class="connected">
<label for="connected">Match Datapoint</label>
<input type="checkbox" id="connected" bind:checked={formState.connected} />
Expand Down Expand Up @@ -80,5 +100,6 @@
required
/>
{/if}

<button type="submit">Add Datapoint</button>
</form>
13 changes: 7 additions & 6 deletions frontend/src/components/Table.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import { updateData, deleteData } from '../services/api';
import { data, currentlyEditing, tempData } from '../stores/stores';
import { refreshData } from '../services/dataService';

import type { Datapoint, DatapointUpdate } from '../services/api';

let localTempData: DatapointUpdate = {
Expand All @@ -24,7 +23,7 @@
}

function cancelEditing(): void {
// This is called when the user clicks the cancel button after editing a datapoint in the table
// This is called when the user clicks the cancel button after editing a datapoint in the table
// It resets the currentlyEditing variable and the tempData variable and cancels the edit

currentlyEditing.set(null);
Expand Down Expand Up @@ -77,6 +76,7 @@
<th>Entity ID</th>
<th>Entity Type</th>
<th>Attribute Name</th>
<th>FIWARE Service</th> <!-- Add this line -->
<th>Status</th>
<th>Actions</th>
</tr>
Expand All @@ -94,30 +94,31 @@
{#if $currentlyEditing === row.object_id}
<input bind:value={localTempData.description} />
{:else}
{row.description || ''}
{row.description || ""}
{/if}
</td>
<td>
{#if $currentlyEditing === row.object_id}
<input bind:value={localTempData.entity_id} />
{:else}
{row.entity_id || ''}
{row.entity_id || ""}
{/if}
</td>
<td>
{#if $currentlyEditing === row.object_id}
<input bind:value={localTempData.entity_type} />
{:else}
{row.entity_type || ''}
{row.entity_type || ""}
{/if}
</td>
<td>
{#if $currentlyEditing === row.object_id}
<input bind:value={localTempData.attribute_name} />
{:else}
{row.attribute_name || ''}
{row.attribute_name || ""}
{/if}
</td>
<td>{row.fiware_service || ""}</td> <!-- Add this line -->
<td>{row.status ? "Match found" : "No match"}</td>
<td>
{#if $currentlyEditing === row.object_id}
Expand Down
14 changes: 10 additions & 4 deletions frontend/src/services/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface Datapoint {
entity_type: string | null; // Can be a string or null
attribute_name: string | null; // Can be a string or null
connected: boolean;
fiware_service?: string; // Can be a string or null
status?: string | boolean | null; // Can be a string, boolean, or null
}

Expand Down Expand Up @@ -43,7 +44,7 @@ export const fetchData = async (): Promise<Datapoint[]> => {
const response: Response = await fetch(`${API_URL}/data`);
const responseData = await response.json();
let data: Datapoint[] = await Promise.all(responseData.map(async row => {
row.status = await getStatus(row.object_id);
row.status = await getStatus(row.object_id, row.fiware_service);
return row;
}));
return data;
Expand All @@ -58,7 +59,8 @@ export const addData = async (data: Datapoint) => {
const response: Response = await fetch(`${API_URL}/data`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
'Content-Type': 'application/json',
'FIWARE-SERVICE': data.fiware_service || '',
},
body: JSON.stringify(payload)
});
Expand Down Expand Up @@ -100,8 +102,12 @@ export const deleteData = async (object_id: string): Promise<Datapoint | null> =
return responseData;
}

export const getStatus = async (object_id: string): Promise<boolean> => {
const response: Response = await fetch(`${API_URL}/data/${object_id}/status`);
export const getStatus = async (object_id: string, fiware_service: string): Promise<boolean> => {
const response: Response = await fetch(`${API_URL}/data/${object_id}/status`, {
headers: {
'fiware-service': fiware_service
}
});
if (!response.ok) {
throw new Error(`Failed to get status for datapoint with object_id ${object_id}`);
}
Expand Down
130 changes: 103 additions & 27 deletions tests/test_crud.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import json
import time
import requests
import re
import pydantic
import logging
from filip.clients.ngsi_v2 import ContextBrokerClient
from filip.models import FiwareHeader
from filip.models.ngsi_v2.context import ContextEntity
import importlib
from backend.api.main import Datapoint
from test_settings import settings
from tests.test_init import TestInit
import importlib


class TestCRUD(TestInit):
Expand Down Expand Up @@ -78,14 +83,51 @@ def test_create(self):
object_id4 = response4.json()["object_id"]
self.assertTrue(response4.ok)

def test_create_default_fiware_service(self):
headers = {
'Accept': 'application/json',
'fiware-service': settings.FIWARE_SERVICE
}

# Create datapoint without specifying fiware_service
datapoint = Datapoint(
**{
"topic": "topic/of/default",
"jsonpath": "$..data_default"
}
)
response = requests.request("POST", settings.GATEWAY_URL + "/data", headers=headers,
data=datapoint.json())
self.assertTrue(response.ok)
self.assertEqual(response.json()["fiware_service"], settings.FIWARE_SERVICE)

def test_create_custom_fiware_service(self):
headers = {
'Accept': 'application/json',
'fiware-service': 'custom_service'
}

# Create datapoint with custom fiware_service
datapoint = Datapoint(
**{
"topic": "topic/of/custom",
"jsonpath": "$..data_custom",
"fiware_service": "custom_service"
}
)
response = requests.request("POST", settings.GATEWAY_URL + "/data", headers=headers,
data=datapoint.json())
self.assertTrue(response.ok)
self.assertEqual(response.json()["fiware_service"], "custom_service")

def test_read(self):
headers = {
'Accept': 'application/json'
}
datapoint5 = Datapoint(
**{
"topic": "topic/of/crud",
"jsonpath": "$..data5"
"jsonpath": "$..dat5"
}
)
response = requests.request("POST", settings.GATEWAY_URL + "/data", headers=headers,
Expand Down Expand Up @@ -201,53 +243,88 @@ def test_match_datapoints(self):
'Accept': 'application/json'
}

# Create matched datapoint
matched_datapoint = Datapoint(
# create matched datapoint
datapoint_matched = Datapoint(
**{
"topic": "topic/of/match",
"jsonpath": "$..data_match",
"connected": True,
"entity_id": "dp:001",
"entity_type": "Device",
"attribute_name": "temperature"
"attribute_name": "temperature",
"fiware_service": "default_service"
}
)
response = requests.request("POST", settings.GATEWAY_URL + "/data", headers=headers,
data=matched_datapoint.json())
# Create a new entity and attribute to ensure they exist
fiware_header_1 = FiwareHeader(service=datapoint_matched.fiware_service)
with ContextBrokerClient(fiware_header=fiware_header_1,
url=settings.ORION_URL) as cbc:
attr2 = {'temperature': {'value': 0,
'type': 'Number'}}
self.test_entity_for_match = ContextEntity(
id=datapoint_matched.entity_id,
type=datapoint_matched.entity_type,
**attr2
)
cbc.post_entity(entity=self.test_entity_for_match, update=True)
time.sleep(1)
response = requests.request("POST", settings.GATEWAY_URL + "/data",
headers=headers,
data=datapoint_matched.json())
object_id = response.json()["object_id"]
logging.info(f"Created matched datapoint with object_id: {object_id}")
logging.info(f"Response for matched datapoint creation: {response.json()}")
self.assertTrue(response.ok)

# Verify match status
response = requests.request("GET", settings.GATEWAY_URL + "/data/" + object_id + "/status")
logging.info(f"Match status response for matched datapoint: {response.json()}")
print(f"Created matched datapoint with object_id: {object_id}")
print(f"Response for matched datapoint creation: {response.json()}")
self.assertTrue(response.ok)
self.assertTrue(response.json())

# Create non-matched datapoint
unmatched_datapoint = Datapoint(
# Verify entity creation in Context Broker
cb_headers = {'Accept': 'application/json',
'fiware-service': datapoint_matched.fiware_service}
cb_url = f"{settings.ORION_URL}/v2/entities/dp:001"
cb_response = requests.get(cb_url, headers=cb_headers)
print(f"Context Broker entity creation check URL: {cb_url}")
print(f"Context Broker entity creation check headers: {cb_headers}")
print(f"Context Broker entity creation check response: {cb_response.status_code} - {cb_response.text}")
self.assertTrue(cb_response.ok, "Entity creation in Context Broker failed")

# verify match status
status_url = f"{settings.GATEWAY_URL}/data/{object_id}/status"
status_response = requests.get(status_url)
print(f"Match status response for matched datapoint: {status_response.json()}")
print(f"Status check URL: {status_url}")
print(f"Status check headers: {headers}")
print(f"Status check response: {status_response.status_code} - {status_response.text}")
self.assertTrue(status_response.ok)
self.assertTrue(status_response.json())

# create non-matched datapoint
datapoint_no_matched = Datapoint(
**{
"topic": "topic/of/match",
"jsonpath": "$..data_nomatch",
"connected": True,
"entity_id": "NonExistentEntityID",
"entity_type": "NonExistentType",
"attribute_name": "NonExistentAttribute"
"attribute_name": "NonExistentAttribute",
"fiware_service": settings.FIWARE_SERVICE
}
)
response = requests.request("POST", settings.GATEWAY_URL + "/data", headers=headers,
data=unmatched_datapoint.json())
response = requests.request("POST", settings.GATEWAY_URL + "/data",
headers=headers,
data=datapoint_no_matched.json())
object_id = response.json()["object_id"]
logging.info(f"Created non-matched datapoint with object_id: {object_id}")
logging.info(f"Response for non-matched datapoint creation: {response.json()}")
print(f"Created non-matched datapoint with object_id: {object_id}")
print(f"Response for non-matched datapoint creation: {response.json()}")
self.assertTrue(response.ok)

# Verify non-match status
response = requests.request("GET", settings.GATEWAY_URL + "/data/" + object_id + "/status")
logging.info(f"Match status response for non-matched datapoint: {response.json()}")
self.assertTrue(response.ok)
self.assertFalse(response.json())
# verify non-match status
status_url = f"{settings.GATEWAY_URL}/data/{object_id}/status"
status_response = requests.get(status_url)
print(f"Match status response for non-matched datapoint: {status_response.json()}")
print(f"Status check URL: {status_url}")
print(f"Status check headers: {headers}")
print(f"Status check response: {status_response.status_code} - {status_response.text}")
self.assertTrue(status_response.ok)
self.assertFalse(status_response.json())

def test_object_id_immutable(self):
headers = {
Expand Down Expand Up @@ -334,7 +411,6 @@ def test_object_id_auto_generation(self):
response = requests.request("GET", settings.GATEWAY_URL + "/data/" + object_id)
self.assertTrue(response.ok)


def test_partial_update_patch(self):
headers = {
'Accept': 'application/json'
Expand Down
Loading
Loading