Skip to content

Commit

Permalink
Fix aiohttp connection reset errors (#15577)
Browse files Browse the repository at this point in the history
* Fix aiohttp connection reset errors

* Update aiohttp_client.py

* Update aiohttp_client.py

* Update __init__.py

* Update mjpeg.py

* Update mjpeg.py

* Update ffmpeg.py

* Update ffmpeg.py

* Update ffmpeg.py

* Update proxy.py

* Update __init__.py

* Update aiohttp_client.py

* Update aiohttp_client.py

* Update proxy.py

* Update proxy.py

* Fix await inside coroutine

* Fix async syntax

* Lint
  • Loading branch information
pvizeli authored and balloob committed Jul 23, 2018
1 parent 8213b14 commit f3dfc43
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 68 deletions.
50 changes: 19 additions & 31 deletions homeassistant/components/camera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,32 +301,23 @@ async def write_to_mjpeg_stream(img_bytes):

last_image = None

try:
while True:
img_bytes = await self.async_camera_image()
if not img_bytes:
break

if img_bytes and img_bytes != last_image:
await write_to_mjpeg_stream(img_bytes)

# Chrome seems to always ignore first picture,
# print it twice.
if last_image is None:
await write_to_mjpeg_stream(img_bytes)
while True:
img_bytes = await self.async_camera_image()
if not img_bytes:
break

last_image = img_bytes
if img_bytes and img_bytes != last_image:
await write_to_mjpeg_stream(img_bytes)

await asyncio.sleep(interval)
# Chrome seems to always ignore first picture,
# print it twice.
if last_image is None:
await write_to_mjpeg_stream(img_bytes)
last_image = img_bytes

except asyncio.CancelledError:
_LOGGER.debug("Stream closed by frontend.")
response = None
raise
await asyncio.sleep(interval)

finally:
if response is not None:
await response.write_eof()
return response

async def handle_async_mjpeg_stream(self, request):
"""Serve an HTTP MJPEG stream from the camera.
Expand Down Expand Up @@ -409,10 +400,9 @@ async def get(self, request, entity_id):
request.query.get('token') in camera.access_tokens)

if not authenticated:
return web.Response(status=401)
raise web.HTTPUnauthorized()

response = await self.handle(request, camera)
return response
return await self.handle(request, camera)

async def handle(self, request, camera):
"""Handle the camera request."""
Expand All @@ -435,7 +425,7 @@ async def handle(self, request, camera):
return web.Response(body=image,
content_type=camera.content_type)

return web.Response(status=500)
raise web.HTTPInternalServerError()


class CameraMjpegStream(CameraView):
Expand All @@ -448,19 +438,17 @@ async def handle(self, request, camera):
"""Serve camera stream, possibly with interval."""
interval = request.query.get('interval')
if interval is None:
await camera.handle_async_mjpeg_stream(request)
return
return await camera.handle_async_mjpeg_stream(request)

try:
# Compose camera stream from stills
interval = float(request.query.get('interval'))
if interval < MIN_STREAM_INTERVAL:
raise ValueError("Stream interval must be be > {}"
.format(MIN_STREAM_INTERVAL))
await camera.handle_async_still_stream(request, interval)
return
return await camera.handle_async_still_stream(request, interval)
except ValueError:
return web.Response(status=400)
raise web.HTTPBadRequest()


@callback
Expand Down
24 changes: 12 additions & 12 deletions homeassistant/components/camera/ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
})


@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
async def async_setup_platform(hass, config, async_add_devices,
discovery_info=None):
"""Set up a FFmpeg camera."""
if not hass.data[DATA_FFMPEG].async_run_test(config.get(CONF_INPUT)):
return
Expand All @@ -49,30 +49,30 @@ def __init__(self, hass, config):
self._input = config.get(CONF_INPUT)
self._extra_arguments = config.get(CONF_EXTRA_ARGUMENTS)

@asyncio.coroutine
def async_camera_image(self):
async def async_camera_image(self):
"""Return a still image response from the camera."""
from haffmpeg import ImageFrame, IMAGE_JPEG
ffmpeg = ImageFrame(self._manager.binary, loop=self.hass.loop)

image = yield from asyncio.shield(ffmpeg.get_image(
image = await asyncio.shield(ffmpeg.get_image(
self._input, output_format=IMAGE_JPEG,
extra_cmd=self._extra_arguments), loop=self.hass.loop)
return image

@asyncio.coroutine
def handle_async_mjpeg_stream(self, request):
async def handle_async_mjpeg_stream(self, request):
"""Generate an HTTP MJPEG stream from the camera."""
from haffmpeg import CameraMjpeg

stream = CameraMjpeg(self._manager.binary, loop=self.hass.loop)
yield from stream.open_camera(
await stream.open_camera(
self._input, extra_cmd=self._extra_arguments)

yield from async_aiohttp_proxy_stream(
self.hass, request, stream,
'multipart/x-mixed-replace;boundary=ffserver')
yield from stream.close()
try:
return await async_aiohttp_proxy_stream(
self.hass, request, stream,
'multipart/x-mixed-replace;boundary=ffserver')
finally:
await stream.close()

@property
def name(self):
Expand Down
7 changes: 3 additions & 4 deletions homeassistant/components/camera/mjpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,18 @@ def camera_image(self):
with closing(req) as response:
return extract_image_from_mjpeg(response.iter_content(102400))

@asyncio.coroutine
def handle_async_mjpeg_stream(self, request):
async def handle_async_mjpeg_stream(self, request):
"""Generate an HTTP MJPEG stream from the camera."""
# aiohttp don't support DigestAuth -> Fallback
if self._authentication == HTTP_DIGEST_AUTHENTICATION:
yield from super().handle_async_mjpeg_stream(request)
await super().handle_async_mjpeg_stream(request)
return

# connect to stream
websession = async_get_clientsession(self.hass)
stream_coro = websession.get(self._mjpeg_url, auth=self._auth)

yield from async_aiohttp_proxy_web(self.hass, request, stream_coro)
return await async_aiohttp_proxy_web(self.hass, request, stream_coro)

@property
def name(self):
Expand Down
13 changes: 4 additions & 9 deletions homeassistant/components/camera/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ async def handle_async_mjpeg_stream(self, request):
stream_coro = websession.get(url, headers=self._headers)

if not self._stream_opts:
await async_aiohttp_proxy_web(self.hass, request, stream_coro)
return
return await async_aiohttp_proxy_web(
self.hass, request, stream_coro)

response = aiohttp.web.StreamResponse()
response.content_type = ('multipart/x-mixed-replace; '
Expand Down Expand Up @@ -229,15 +229,10 @@ async def write(img_bytes):
_resize_image, image, self._stream_opts)
await write(image)
data = data[jpg_end + 2:]
except asyncio.CancelledError:
_LOGGER.debug("Stream closed by frontend.")
finally:
req.close()
response = None
raise

finally:
if response is not None:
await response.write_eof()
return response

@property
def name(self):
Expand Down
19 changes: 7 additions & 12 deletions homeassistant/helpers/aiohttp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ def async_create_clientsession(hass, verify_ssl=True, auto_cleanup=True,
return clientsession


@asyncio.coroutine
@bind_hass
def async_aiohttp_proxy_web(hass, request, web_coro, buffer_size=102400,
timeout=10):
async def async_aiohttp_proxy_web(hass, request, web_coro,
buffer_size=102400, timeout=10):
"""Stream websession request to aiohttp web response."""
try:
with async_timeout.timeout(timeout, loop=hass.loop):
req = yield from web_coro
req = await web_coro

except asyncio.CancelledError:
# The user cancelled the request
Expand All @@ -88,7 +87,7 @@ def async_aiohttp_proxy_web(hass, request, web_coro, buffer_size=102400,
raise HTTPBadGateway() from err

try:
yield from async_aiohttp_proxy_stream(
return await async_aiohttp_proxy_stream(
hass,
request,
req.content,
Expand All @@ -112,19 +111,15 @@ async def async_aiohttp_proxy_stream(hass, request, stream, content_type,
data = await stream.read(buffer_size)

if not data:
await response.write_eof()
break

await response.write(data)

except (asyncio.TimeoutError, aiohttp.ClientError):
# Something went wrong fetching data, close connection gracefully
await response.write_eof()

except asyncio.CancelledError:
# The user closed the connection
# Something went wrong fetching data, closed connection
pass

return response


@callback
def _async_register_clientsession_shutdown(hass, clientsession):
Expand Down

0 comments on commit f3dfc43

Please sign in to comment.