Skip to content

Commit

Permalink
Use bg process to prevent hammering iohub
Browse files Browse the repository at this point in the history
  • Loading branch information
domstoppable committed Nov 25, 2024
1 parent 7cd4703 commit e09cf06
Showing 1 changed file with 170 additions and 61 deletions.
231 changes: 170 additions & 61 deletions psychopy_eyetracker_pupil_labs/pupil_labs/neon/eyetracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,25 @@
# Copyright (C) 2012-2020 iSolver Software Solutions (C) 2021 Open Science Tools Ltd.
# Distributed under the terms of the GNU General Public License (GPL).
import logging
import time
from typing import Optional, Dict, Tuple, Union
from dataclasses import dataclass
import multiprocessing as mp
import asyncio

from psychopy.iohub.constants import EyeTrackerConstants
from psychopy.iohub.devices import Computer, Device
from psychopy.iohub.devices.eyetracker import EyeTrackerDevice
from psychopy.iohub.errors import printExceptionDetailsToStdErr
from psychopy.iohub.constants import EventConstants

from pupil_labs.realtime_api.simple import Device as CompanionDevice
from pupil_labs.realtime_api import Device as CompanionDevice
from pupil_labs.real_time_screen_gaze.gaze_mapper import GazeMapper
from pupil_labs.realtime_api import (
receive_gaze_data,
receive_video_frames,
)
from pupil_labs.realtime_api.time_echo import TimeOffsetEstimator


class EyeTracker(EyeTrackerDevice):
Expand Down Expand Up @@ -56,20 +65,15 @@ class EyeTracker(EyeTrackerDevice):
def __init__(self, *args, **kwargs) -> None:
EyeTrackerDevice.__init__(self, *args, **kwargs)

self._device = None
self._time_offset_estimate = None

self._latest_sample = None
self._latest_gaze_position = None
self._actively_recording = False

self._screen_surface = None
self._window_size = None

self._cached_scene = None

self._gaze_mapper = None

self.mapper_process_command_queue = mp.Queue()
self.mapper_output_queue = mp.Queue()
self.mapper_process = None
self.setConnectionState(True)

def trackerTime(self) -> float:
Expand Down Expand Up @@ -114,20 +118,21 @@ def setConnectionState(self, enable: bool) -> None:
:return:
bool: indicates the current connection state to the eye tracking hardware.
"""
if enable and self._device is None:
self._device = CompanionDevice(
self._runtime_settings["companion_address"],
int(self._runtime_settings["companion_port"]),
if enable and self.mapper_process is None:
self.mapper_process = mp.Process(
target=bg_gaze_mapper,
args=(
self._runtime_settings["companion_address"],
int(self._runtime_settings["companion_port"]),
self.mapper_process_command_queue,
self.mapper_output_queue
)
)
self.mapper_process.start()

calibration = self._device.get_calibration()
self._gaze_mapper = GazeMapper(calibration)

self._time_offset_estimate = self._device.estimate_time_offset()

elif not enable and self._device is not None:
self._device.close()
self._device = None
elif not enable and self.mapper_process is not None:
self.mapper_process_command_queue.put(StopMessage())
self.mapper_process = None

def isConnected(self) -> bool:
"""isConnected returns whether the ioHub EyeTracker Device is connected
Expand All @@ -141,7 +146,7 @@ def isConnected(self) -> bool:
bool: True = the eye tracking hardware is connected. False otherwise.
"""
return self._device is not None
return self.mapper_process is not None

def runSetupProcedure(self, calibration_args: Optional[Dict] = None) -> int:
"""
Expand Down Expand Up @@ -189,14 +194,7 @@ def setRecordingState(self, should_be_recording: bool) -> bool:
if not self.isConnected():
return False

if should_be_recording:
self._device.recording_start()
else:
try:
self._device.recording_stop_and_save()
except Exception as exc:
logging.error(f"Failed to stop recording: {exc}")
printExceptionDetailsToStdErr()
self.mapper_process_command_queue.put(RecordMessage(should_be_recording))

self._actively_recording = should_be_recording

Expand Down Expand Up @@ -269,28 +267,21 @@ def _poll(self):
if not self.isConnected():
return

if self._screen_surface is None:
return

logged_time = Computer.getTime()
scene_data = self._device.receive_scene_video_frame(timeout_seconds=0)
if scene_data is not None:
self._cached_scene, _ = scene_data
while not self.mapper_output_queue.empty():
message = self.mapper_output_queue.get()

gaze = self._device.receive_gaze_datum(timeout_seconds=0)
if gaze is not None and self._cached_scene is not None:
surface_map = self._gaze_mapper.process_frame(self._cached_scene, gaze)
for surface_gaze in surface_map.mapped_gaze[self._screen_surface.uid]:
gaze_in_pix = [
surface_gaze.x * self._window_size[0],
surface_gaze.y * self._window_size[1],
]
if isinstance(message, MappedGazeMessage):
gaze_in_pix = message.gaze_in_pix
if gaze_in_pix is not None:
gaze_in_display_units = self._eyeTrackerToDisplayCoords(gaze_in_pix)
self._add_gaze_sample(gaze_in_display_units, message.gaze_data, logged_time)

gaze_in_display_units = self._eyeTrackerToDisplayCoords(gaze_in_pix)
self._add_gaze_sample(gaze_in_display_units, gaze, logged_time)
if hasattr(message.gaze_data, "pupil_diameter_left"):
self._add_pupil_sample(message.gaze_data, logged_time)

if hasattr(gaze, 'pupil_diameter_left'):
self._add_pupil_sample(gaze, logged_time)
elif isinstance(message, TimeOffsetMessage):
self._time_offset_estimate = message.offset_value

def _add_gaze_sample(self, surface_gaze, gaze_datum, logged_time):
native_time = gaze_datum.timestamp_unix_seconds
Expand Down Expand Up @@ -425,21 +416,10 @@ def _add_pupil_sample(self, pupil_datum, logged_time):
self._latest_sample = sample

def register_surface(self, tag_verts, window_size):
corrected_verts = {int(tag_id): verts for tag_id, verts in tag_verts.items()}

self._gaze_mapper.clear_surfaces()
self._screen_surface = self._gaze_mapper.add_surface(
corrected_verts,
window_size
)

self._window_size = window_size
self.mapper_process_command_queue.put(SurfaceMessage(tag_verts, window_size))

def send_event(self, event_name, timestamp_ns=None):
if timestamp_ns == 0:
timestamp_ns = None

self._device.send_event(event_name, event_timestamp_unix_ns=timestamp_ns)
self.mapper_process_command_queue.put(EventMessage(event_name, timestamp_ns))

def _psychopyTimeInTrackerTime(self, psychopy_time):
return psychopy_time + self._time_offset_estimate.time_offset_ms.mean / 1000
Expand All @@ -453,3 +433,132 @@ def _close(self):
self.setConnectionState(False)
self.__class__._INSTANCE = None
super()._close()

@dataclass
class StopMessage:
pass

@dataclass
class SurfaceMessage:
tag_verts: list
window_size: list

@dataclass
class RecordMessage:
state: bool

@dataclass
class TimeOffsetMessage:
offset_value: object

@dataclass
class MappedGazeMessage:
gaze_data: object
gaze_in_pix: list

@dataclass
class EventMessage:
event_name: str
timestamp_ns: int


def bg_gaze_mapper(host, port, input_queue, output_queue):
async_mapper = AsyncQueueMapper(host, port, input_queue, output_queue)
asyncio.run(async_mapper.run_tasks())


class AsyncQueueMapper:
def __init__(self, host, port, input_queue, output_queue):
self.host = host
self.port = port
self.input_queue = input_queue
self.output_queue = output_queue
self.stop_event = asyncio.Event()

self.screen_surface = None
self.window_size = [0, 0]

self.device = None
self.gaze_mapper = None

async def run_tasks(self):
self.device = CompanionDevice(self.host, self.port)

calibration = await self.device.get_calibration()
self.gaze_mapper = GazeMapper(calibration)

status = await self.device.get_status()

estimator = TimeOffsetEstimator(self.host, status.phone.time_echo_port)
estimated_offset = await estimator.estimate()

self.output_queue.put(TimeOffsetMessage(estimated_offset))

await asyncio.gather(
self.check_input_queue(),
self.receive_and_queue_scene_data(status),
self.receive_and_queue_gaze_data(status)
)

async def check_input_queue(self):
while not self.stop_event.is_set():
while not self.input_queue.empty():
message = self.input_queue.get()

if isinstance(message, EventMessage):
await self.device.send_event(
message.event_name,
event_timestamp_unix_ns=message.timestamp_ns
)

elif isinstance(message, StopMessage):
self.stop_event.set()
await self.device.close()

elif isinstance(message, SurfaceMessage):
corrected_verts = {int(tag_id): verts for tag_id, verts in message.tag_verts.items()}
self.window_size = message.window_size

self.gaze_mapper.clear_surfaces()
self.screen_surface = self.gaze_mapper.add_surface(
corrected_verts,
message.window_size
)

elif isinstance(message, RecordMessage):
try:
if message.state:
await self.device.recording_start()
else:
await self.device.recording_stop_and_save()
except Exception as exc:
logging.error(f"Failed to change recording state (enabled={message.state}): {exc}")
printExceptionDetailsToStdErr()

await asyncio.sleep(0.001)

async def receive_and_queue_scene_data(self, status):
sensor_world = status.direct_world_sensor()
async for frame in receive_video_frames(sensor_world.url, run_loop=True):
if self.stop_event.is_set():
break

bgr_buffer = frame.bgr_buffer()
self.gaze_mapper.process_scene(bgr_buffer)

async def receive_and_queue_gaze_data(self, status):
sensor_gaze = status.direct_gaze_sensor()
async for gaze_data in receive_gaze_data(sensor_gaze.url, run_loop=True):
if self.stop_event.is_set():
break

surface_map = self.gaze_mapper.process_gaze(gaze_data)
result = None
if surface_map is not None and self.screen_surface.uid in surface_map.mapped_gaze:
for surface_gaze in surface_map.mapped_gaze[self.screen_surface.uid]:
result = [
surface_gaze.x * self.window_size[0],
surface_gaze.y * self.window_size[1],
]

self.output_queue.put(MappedGazeMessage(gaze_data, result))

0 comments on commit e09cf06

Please sign in to comment.