Skip to content

Commit

Permalink
[RTC-435] Add track, peer metadata and track metadata (#27)
Browse files Browse the repository at this point in the history
* Add tracks

* WIP update api

* Lint

* Add examples

* Improve docs of server notifications

* endlines

* Format

* Update proto

* Update client
  • Loading branch information
roznawsk authored Feb 7, 2024
1 parent 52da564 commit a27015e
Show file tree
Hide file tree
Showing 30 changed files with 820 additions and 33 deletions.
16 changes: 16 additions & 0 deletions docs/server_notifications.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Server Notifications

The Jellyfish can send one of the following notifications:

`ServerMessageRoomCreated`,
`ServerMessageRoomDeleted`,
`ServerMessageRoomCrashed`,
`ServerMessagePeerConnected`,
`ServerMessagePeerDisconnected`,
`ServerMessagePeerCrashed`,
`ServerMessageComponentCrashed`,
`ServerMessageTrackAdded`,
`ServerMessageTrackMetadataUpdated`,
`ServerMessageTrackRemoved`,
`ServerMessageHlsPlayable`,
`ServerMessageMetricsReport`
17 changes: 17 additions & 0 deletions examples/room_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from jellyfish import ComponentOptionsHLS, PeerOptionsWebRTC, RoomApi

# Create a room
room_api = RoomApi(server_address="localhost:5002", server_api_token="development")

jellyfish_address, room = room_api.create_room(
video_codec="h264", webhook_url="http://localhost:5000/webhook"
)
print((jellyfish_address, room))

# Add peer to the room
peer_token, peer_webrtc = room_api.add_peer(room.id, options=PeerOptionsWebRTC())
print((peer_token, peer_webrtc))

# Add component to the room
component_hls = room_api.add_component(room.id, options=ComponentOptionsHLS())
print(component_hls)
38 changes: 38 additions & 0 deletions examples/server_notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio

from jellyfish import Notifier, RoomApi
from jellyfish.events import ServerMessageTrackAdded, ServerMessageTrackType

notifier = Notifier(server_address="localhost:5002", server_api_token="development")


@notifier.on_server_notification
def handle_notification(server_notification):
print(f"Received a notification: {server_notification}")

if isinstance(server_notification, ServerMessageTrackAdded):
if server_notification.track.type == ServerMessageTrackType.TRACK_TYPE_AUDIO:
print("New audio track has been added")
elif server_notification.track.type == ServerMessageTrackType.TRACK_TYPE_VIDEO:
print("New video track has been added")


@notifier.on_metrics
def handle_metrics(metrics_report):
print(f"Received WebRTC metrics: {metrics_report}")


async def test_notifier():
notifier_task = asyncio.create_task(notifier.connect())

# Wait for notifier to be ready to receive messages
await notifier.wait_ready()

# Create a room to trigger a server notification
room_api = RoomApi()
room_api.create_room()

await notifier_task


asyncio.run(test_notifier())
4 changes: 2 additions & 2 deletions jellyfish/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
)

# API
from jellyfish._webhook_notifier import receive_json
from jellyfish._webhook_notifier import receive_binary
from jellyfish._ws_notifier import Notifier
from jellyfish.api._recording_api import RecordingApi
from jellyfish.api._room_api import RoomApi
Expand All @@ -38,7 +38,7 @@
"RoomApi",
"RecordingApi",
"Notifier",
"receive_json",
"receive_binary",
"Room",
"RoomConfig",
"RoomConfigVideoCodec",
Expand Down
Empty file.
128 changes: 128 additions & 0 deletions jellyfish/_openapi_client/api/default/healthcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from http import HTTPStatus
from typing import Any, Dict, Optional, Union

import httpx

from ... import errors
from ...client import AuthenticatedClient, Client
from ...models.healthcheck_response import HealthcheckResponse
from ...types import Response


def _get_kwargs() -> Dict[str, Any]:
return {
"method": "get",
"url": "/health",
}


def _parse_response(
*, client: Union[AuthenticatedClient, Client], response: httpx.Response
) -> Optional[HealthcheckResponse]:
if response.status_code == HTTPStatus.OK:
response_200 = HealthcheckResponse.from_dict(response.json())

return response_200
if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
response_500 = HealthcheckResponse.from_dict(response.json())

return response_500
if client.raise_on_unexpected_status:
raise errors.UnexpectedStatus(response.status_code, response.content)
else:
return None


def _build_response(
*, client: Union[AuthenticatedClient, Client], response: httpx.Response
) -> Response[HealthcheckResponse]:
return Response(
status_code=HTTPStatus(response.status_code),
content=response.content,
headers=response.headers,
parsed=_parse_response(client=client, response=response),
)


def sync_detailed(
*,
client: Union[AuthenticatedClient, Client],
) -> Response[HealthcheckResponse]:
"""Describes the health of Jellyfish
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Response[HealthcheckResponse]
"""

kwargs = _get_kwargs()

response = client.get_httpx_client().request(
**kwargs,
)

return _build_response(client=client, response=response)


def sync(
*,
client: Union[AuthenticatedClient, Client],
) -> Optional[HealthcheckResponse]:
"""Describes the health of Jellyfish
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
HealthcheckResponse
"""

return sync_detailed(
client=client,
).parsed


async def asyncio_detailed(
*,
client: Union[AuthenticatedClient, Client],
) -> Response[HealthcheckResponse]:
"""Describes the health of Jellyfish
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Response[HealthcheckResponse]
"""

kwargs = _get_kwargs()

response = await client.get_async_httpx_client().request(**kwargs)

return _build_response(client=client, response=response)


async def asyncio(
*,
client: Union[AuthenticatedClient, Client],
) -> Optional[HealthcheckResponse]:
"""Describes the health of Jellyfish
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
HealthcheckResponse
"""

return (
await asyncio_detailed(
client=client,
)
).parsed
12 changes: 12 additions & 0 deletions jellyfish/_openapi_client/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from .component_properties_rtsp import ComponentPropertiesRTSP
from .component_rtsp import ComponentRTSP
from .error import Error
from .health_report import HealthReport
from .health_report_distribution import HealthReportDistribution
from .health_report_status import HealthReportStatus
from .healthcheck_response import HealthcheckResponse
from .peer import Peer
from .peer_details_response import PeerDetailsResponse
from .peer_details_response_data import PeerDetailsResponseData
Expand All @@ -30,6 +34,8 @@
from .rooms_listing_response import RoomsListingResponse
from .s3_credentials import S3Credentials
from .subscription_config import SubscriptionConfig
from .track import Track
from .track_type import TrackType

__all__ = (
"AddComponentJsonBody",
Expand All @@ -47,6 +53,10 @@
"ComponentPropertiesRTSP",
"ComponentRTSP",
"Error",
"HealthcheckResponse",
"HealthReport",
"HealthReportDistribution",
"HealthReportStatus",
"Peer",
"PeerDetailsResponse",
"PeerDetailsResponseData",
Expand All @@ -62,4 +72,6 @@
"RoomsListingResponse",
"S3Credentials",
"SubscriptionConfig",
"Track",
"TrackType",
)
19 changes: 19 additions & 0 deletions jellyfish/_openapi_client/models/component_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

if TYPE_CHECKING:
from ..models.component_properties_file import ComponentPropertiesFile
from ..models.track import Track


T = TypeVar("T", bound="ComponentFile")
Expand All @@ -18,6 +19,8 @@ class ComponentFile:

id: str
"""Assigned component ID"""
tracks: List["Track"]
"""List of all component's tracks"""
type: str
"""Component type"""
properties: Union[Unset, "ComponentPropertiesFile"] = UNSET
Expand All @@ -28,6 +31,12 @@ class ComponentFile:
def to_dict(self) -> Dict[str, Any]:
"""@private"""
id = self.id
tracks = []
for tracks_item_data in self.tracks:
tracks_item = tracks_item_data.to_dict()

tracks.append(tracks_item)

type = self.type
properties: Union[Unset, Dict[str, Any]] = UNSET
if not isinstance(self.properties, Unset):
Expand All @@ -38,6 +47,7 @@ def to_dict(self) -> Dict[str, Any]:
field_dict.update(
{
"id": id,
"tracks": tracks,
"type": type,
}
)
Expand All @@ -50,10 +60,18 @@ def to_dict(self) -> Dict[str, Any]:
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
"""@private"""
from ..models.component_properties_file import ComponentPropertiesFile
from ..models.track import Track

d = src_dict.copy()
id = d.pop("id")

tracks = []
_tracks = d.pop("tracks")
for tracks_item_data in _tracks:
tracks_item = Track.from_dict(tracks_item_data)

tracks.append(tracks_item)

type = d.pop("type")

_properties = d.pop("properties", UNSET)
Expand All @@ -65,6 +83,7 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:

component_file = cls(
id=id,
tracks=tracks,
type=type,
properties=properties,
)
Expand Down
19 changes: 19 additions & 0 deletions jellyfish/_openapi_client/models/component_hls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

if TYPE_CHECKING:
from ..models.component_properties_hls import ComponentPropertiesHLS
from ..models.track import Track


T = TypeVar("T", bound="ComponentHLS")
Expand All @@ -18,6 +19,8 @@ class ComponentHLS:
"""Assigned component ID"""
properties: "ComponentPropertiesHLS"
"""Properties specific to the HLS component"""
tracks: List["Track"]
"""List of all component's tracks"""
type: str
"""Component type"""
additional_properties: Dict[str, Any] = _attrs_field(init=False, factory=dict)
Expand All @@ -28,6 +31,12 @@ def to_dict(self) -> Dict[str, Any]:
id = self.id
properties = self.properties.to_dict()

tracks = []
for tracks_item_data in self.tracks:
tracks_item = tracks_item_data.to_dict()

tracks.append(tracks_item)

type = self.type

field_dict: Dict[str, Any] = {}
Expand All @@ -36,6 +45,7 @@ def to_dict(self) -> Dict[str, Any]:
{
"id": id,
"properties": properties,
"tracks": tracks,
"type": type,
}
)
Expand All @@ -46,17 +56,26 @@ def to_dict(self) -> Dict[str, Any]:
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
"""@private"""
from ..models.component_properties_hls import ComponentPropertiesHLS
from ..models.track import Track

d = src_dict.copy()
id = d.pop("id")

properties = ComponentPropertiesHLS.from_dict(d.pop("properties"))

tracks = []
_tracks = d.pop("tracks")
for tracks_item_data in _tracks:
tracks_item = Track.from_dict(tracks_item_data)

tracks.append(tracks_item)

type = d.pop("type")

component_hls = cls(
id=id,
properties=properties,
tracks=tracks,
type=type,
)

Expand Down
Loading

0 comments on commit a27015e

Please sign in to comment.