Skip to content

Commit

Permalink
Add proxies for MSE and WebRTC live view websockets (blakeblackshear#365
Browse files Browse the repository at this point in the history
)

* Add proxy views for new live view types and fix jsmpeg

* Fix current tests

* Adjust path

* Restore frigate integration path

* Restore frigate integration path

* Add other tests

* Fix tests

* Formatting

* Remove copied code
  • Loading branch information
NickM-27 authored and dermotduffy committed Dec 11, 2022
1 parent 97ce2d8 commit fff2ea1
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 16 deletions.
30 changes: 29 additions & 1 deletion custom_components/frigate/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ def async_setup(hass: HomeAssistant) -> None:
"""Set up the views."""
session = async_get_clientsession(hass)
hass.http.register_view(JSMPEGProxyView(session))
hass.http.register_view(MSEProxyView(session))
hass.http.register_view(WebRTCProxyView(session))
hass.http.register_view(NotificationsProxyView(session))
hass.http.register_view(SnapshotsProxyView(session))
hass.http.register_view(RecordingProxyView(session))
Expand Down Expand Up @@ -479,7 +481,33 @@ class JSMPEGProxyView(WebsocketProxyView):

def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
return f"live/{kwargs['path']}"
return f"live/jsmpeg/{kwargs['path']}"


class MSEProxyView(WebsocketProxyView):
"""A proxy for MSE websocket."""

url = "/api/frigate/{frigate_instance_id:.+}/mse/{path:.+}"
extra_urls = ["/api/frigate/mse/{path:.+}"]

name = "api:frigate:mse"

def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
return f"live/mse/{kwargs['path']}"


class WebRTCProxyView(WebsocketProxyView):
"""A proxy for WebRTC websocket."""

url = "/api/frigate/{frigate_instance_id:.+}/webrtc/{path:.+}"
extra_urls = ["/api/frigate/webrtc/{path:.+}"]

name = "api:frigate:webrtc"

def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
return f"live/webrtc/{kwargs['path']}"


def _init_header(request: web.Request) -> CIMultiDict | dict[str, str]:
Expand Down
204 changes: 189 additions & 15 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,12 @@ async def handler(request: web.Request) -> web.Response:
web.get("/api/events/1577854800.123456-random/snapshot.jpg", handler),
web.get("/api/events/1635807600.123456-random/snapshot.jpg", handler),
web.get("/api/events/1635807359.123456-random/snapshot.jpg", handler),
web.get("/live/front_door", ws_echo_handler),
web.get("/live/querystring", ws_qs_echo_handler),
web.get("/live/jsmpeg/front_door", ws_echo_handler),
web.get("/live/jsmpeg/querystring", ws_qs_echo_handler),
web.get("/live/mse/front_door", ws_echo_handler),
web.get("/live/mse/querystring", ws_qs_echo_handler),
web.get("/live/webrtc/front_door", ws_echo_handler),
web.get("/live/webrtc/querystring", ws_qs_echo_handler),
web.get(
"/api/front_door/start/1664067600.02/end/1664068200.03/clip.mp4",
handler,
Expand Down Expand Up @@ -797,42 +801,92 @@ async def test_jsmpeg_frame_type_ping_pong(
assert result[1].data == b"\x00\x01"


async def test_ws_proxy_specify_protocol(
async def test_jsmpeg_connection_reset(
local_frigate: Any,
hass_client: Any,
) -> None:
"""Test websocket proxy handles the SEC_WEBSOCKET_PROTOCOL header."""
"""Test JSMPEG proxying handles connection resets."""

# Tricky: This test is intended to test a ConnectionResetError to the
# Frigate server, which is the _second_ call to send*. The first call (from
# this test) needs to succeed.
real_send_str = views.aiohttp.web.WebSocketResponse.send_str

called_once = False

async def send_str(*args: Any, **kwargs: Any) -> None:
nonlocal called_once
if called_once:
raise ConnectionResetError
else:
called_once = True
return await real_send_str(*args, **kwargs)

authenticated_hass_client = await hass_client()

ws = await authenticated_hass_client.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/front_door",
headers={hdrs.SEC_WEBSOCKET_PROTOCOL: "foo,bar"},
)
assert ws
await ws.close()
with patch(
"custom_components.frigate.views.aiohttp.ClientWebSocketResponse.send_str",
new=send_str,
):
async with authenticated_hass_client.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/front_door"
) as ws:
await ws.send_str("data")


async def test_ws_proxy_query_string(
async def test_mse_text_binary(
local_frigate: Any,
hass: Any,
hass_client: Any,
) -> None:
"""Test websocket proxy passes on the querystring."""
"""Test JSMPEG proxying text/binary data."""

authenticated_hass_client = await hass_client()

async with authenticated_hass_client.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/querystring?key=value",
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/mse/front_door"
) as ws:
# Test sending text data.
result = await asyncio.gather(
ws.send_str("hello!"),
ws.receive(),
)
assert result[1].type == aiohttp.WSMsgType.TEXT
assert result[1].data == "hello!"

# Test sending binary data.
result = await asyncio.gather(
ws.send_bytes(b"\x00\x01"),
ws.receive(),
)

async def test_jsmpeg_connection_reset(
assert result[1].type == aiohttp.WSMsgType.BINARY
assert result[1].data == b"\x00\x01"


async def test_mse_frame_type_ping_pong(
local_frigate: Any,
hass_client: Any,
) -> None:
"""Test JSMPEG proxying handles ping-pong."""

authenticated_hass_client = await hass_client()

async with authenticated_hass_client.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/mse/front_door"
) as ws:
await ws.ping()

# Push some data through after the ping.
result = await asyncio.gather(
ws.send_bytes(b"\x00\x01"),
ws.receive(),
)
assert result[1].type == aiohttp.WSMsgType.BINARY
assert result[1].data == b"\x00\x01"


async def test_mse_connection_reset(
local_frigate: Any,
hass_client: Any,
) -> None:
Expand Down Expand Up @@ -860,11 +914,131 @@ async def send_str(*args: Any, **kwargs: Any) -> None:
new=send_str,
):
async with authenticated_hass_client.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/front_door"
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/mse/front_door"
) as ws:
await ws.send_str("data")


async def test_webrtc_text_binary(
local_frigate: Any,
hass: Any,
hass_client: Any,
) -> None:
"""Test JSMPEG proxying text/binary data."""

authenticated_hass_client = await hass_client()

async with authenticated_hass_client.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/webrtc/front_door"
) as ws:
# Test sending text data.
result = await asyncio.gather(
ws.send_str("hello!"),
ws.receive(),
)
assert result[1].type == aiohttp.WSMsgType.TEXT
assert result[1].data == "hello!"

# Test sending binary data.
result = await asyncio.gather(
ws.send_bytes(b"\x00\x01"),
ws.receive(),
)

assert result[1].type == aiohttp.WSMsgType.BINARY
assert result[1].data == b"\x00\x01"


async def test_webrtc_frame_type_ping_pong(
local_frigate: Any,
hass_client: Any,
) -> None:
"""Test JSMPEG proxying handles ping-pong."""

authenticated_hass_client = await hass_client()

async with authenticated_hass_client.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/webrtc/front_door"
) as ws:
await ws.ping()

# Push some data through after the ping.
result = await asyncio.gather(
ws.send_bytes(b"\x00\x01"),
ws.receive(),
)
assert result[1].type == aiohttp.WSMsgType.BINARY
assert result[1].data == b"\x00\x01"


async def test_webrtc_connection_reset(
local_frigate: Any,
hass_client: Any,
) -> None:
"""Test JSMPEG proxying handles connection resets."""

# Tricky: This test is intended to test a ConnectionResetError to the
# Frigate server, which is the _second_ call to send*. The first call (from
# this test) needs to succeed.
real_send_str = views.aiohttp.web.WebSocketResponse.send_str

called_once = False

async def send_str(*args: Any, **kwargs: Any) -> None:
nonlocal called_once
if called_once:
raise ConnectionResetError
else:
called_once = True
return await real_send_str(*args, **kwargs)

authenticated_hass_client = await hass_client()

with patch(
"custom_components.frigate.views.aiohttp.ClientWebSocketResponse.send_str",
new=send_str,
):
async with authenticated_hass_client.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/webrtc/front_door"
) as ws:
await ws.send_str("data")


async def test_ws_proxy_specify_protocol(
local_frigate: Any,
hass_client: Any,
) -> None:
"""Test websocket proxy handles the SEC_WEBSOCKET_PROTOCOL header."""

authenticated_hass_client = await hass_client()

ws = await authenticated_hass_client.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/front_door",
headers={hdrs.SEC_WEBSOCKET_PROTOCOL: "foo,bar"},
)
assert ws
await ws.close()


async def test_ws_proxy_query_string(
local_frigate: Any,
hass_client: Any,
) -> None:
"""Test websocket proxy passes on the querystring."""

authenticated_hass_client = await hass_client()

async with authenticated_hass_client.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/querystring?key=value",
) as ws:
result = await asyncio.gather(
ws.send_str("hello!"),
ws.receive(),
)
assert result[1].type == aiohttp.WSMsgType.TEXT
assert result[1].data == "hello!"


async def test_ws_proxy_bad_instance_id(
local_frigate: Any,
hass_client: Any,
Expand Down

0 comments on commit fff2ea1

Please sign in to comment.