Skip to content

Commit

Permalink
Remove triangulate functionality from RecordingSession
Browse files Browse the repository at this point in the history
  • Loading branch information
roomrys committed Oct 13, 2023
1 parent 06dff9f commit 4706fb2
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 394 deletions.
228 changes: 0 additions & 228 deletions sleap/io/cameras.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,234 +572,6 @@ def get_videos_from_selected_cameras(

return videos

def get_all_views_at_frame(
self,
frame_idx,
cams_to_include: Optional[List[Camcorder]] = None,
) -> Dict[Camcorder, "LabeledFrame"]:
"""Get all views at a given frame index.
Args:
frame_idx: Frame index to get views from (0-indexed).
cams_to_include: List of `Camcorder`s to include. Default is all.
Returns:
Dict with `Camcorder` keys and `LabeledFrame` values.
"""

views: Dict[Camcorder, "LabeledFrame"] = {}
videos: Dict[Camcorder, Video] = self.get_videos_from_selected_cameras(
cams_to_include=cams_to_include
)
for cam, video in videos.items():
lfs: List["LabeledFrame"] = self.labels.get((video, [frame_idx]))
if len(lfs) == 0:
logger.debug(
f"No LabeledFrames found for video {video} at {frame_idx}."
)
continue

lf = lfs[0]
if len(lf.instances) == 0:
logger.warning(
f"No Instances found for {lf}."
" There should not be empty LabeledFrames."
)
continue

views[cam] = lf

return views

def get_instances_across_views(
self,
frame_idx: int,
cams_to_include: Optional[List[Camcorder]] = None,
track: Optional["Track"] = None,
require_multiple_views: bool = False,
) -> Dict[Camcorder, "Instance"]:
"""Get all `Instances` accross all views at a given frame index.
Args:
frame_idx: Frame index to get instances from (0-indexed).
cams_to_include: List of `Camcorder`s to include. Default is all.
track: `Track` object used to find instances accross views. Default is None.
require_multiple_views: If True, then raise and error if one or less views
or instances are found.
Returns:
Dict with `Camcorder` keys and `Instances` values.
Raises:
ValueError if require_multiple_view is true and one or less views or
instances are found.
"""

# Get all views at this frame index
views: Dict[Camcorder, "LabeledFrame"] = self.get_all_views_at_frame(
frame_idx=frame_idx,
cams_to_include=cams_to_include,
)

# If not enough views, then raise error
if len(views) <= 1 and require_multiple_views:
raise ValueError(
"One or less views found for frame "
f"{frame_idx} in {self.camera_cluster}."
)

# Find all instance accross all views
instances: Dict[Camcorder, "Instance"] = {}
for cam, lf in views.items():
insts = lf.find(track=track)
if len(insts) > 0:
instances[cam] = insts[0]

# If not enough instances for multiple views, then raise error
if len(instances) <= 1 and require_multiple_views:
raise ValueError(
"One or less instances found for frame "
f"{frame_idx} in {self.camera_cluster}."
)

return instances

def calculate_excluded_views(
self,
instances: Dict[Camcorder, "Instance"],
) -> Tuple[str]:
"""Get excluded views from dictionary of `Camcorder` to `Instance`.
Args:
instances: Dict with `Camcorder` key and `Instance` values.
Returns:
Tuple of excluded view names.
"""

# Calculate excluded views from included cameras
cams_excluded = set(self.cameras) - set(instances.keys())
excluded_views = tuple(cam.name for cam in cams_excluded)

return excluded_views

def calculate_reprojected_points(
self, instances: Dict[Camcorder, "Instance"]
) -> Iterator[Tuple["Instance", np.ndarray]]:
"""Triangulate and reproject instance coordinates.
Note that the order of the instances in the list must match the order of the
cameras in the `CameraCluster`, that is why we require instances be passed in as
a dictionary mapping back to its `Camcorder`.
https://github.com/lambdaloop/aniposelib/blob/d03b485c4e178d7cff076e9fe1ac36837db49158/aniposelib/cameras.py#L491
Args:
instances: Dict with `Camcorder` keys and `Instance` values.
Returns:
A zip of the ordered instances and the related reprojected coordinates. Each
element in the coordinates is a numpy array of shape (1, N, 2) where N is
the number of nodes.
"""

# TODO (LM): Support multiple tracks and optimize

excluded_views = self.calculate_excluded_views(instances=instances)
instances_ordered = [instances[cam] for cam in self.cameras if cam in instances]

# Gather instances into M x F x T x N x 2 arrays (require specific order)
# (M = # views, F = # frames = 1, T = # tracks = 1, N = # nodes, 2 = x, y)
inst_coords = np.stack(
[inst.numpy() for inst in instances_ordered], axis=0
) # M x N x 2
inst_coords = np.expand_dims(inst_coords, axis=1) # M x T=1 x N x 2
inst_coords = np.expand_dims(inst_coords, axis=1) # M x F=1 x T=1 x N x 2
points_3d = triangulate(
p2d=inst_coords,
calib=self.camera_cluster,
excluded_views=excluded_views,
) # F=1, T=1, N, 3

# Update the views with the new 3D points
inst_coords_reprojected = reproject(
points_3d, calib=self.camera_cluster, excluded_views=excluded_views
) # M x F=1 x T=1 x N x 2
insts_coords_list: List[np.ndarray] = np.split(
inst_coords_reprojected.squeeze(), inst_coords_reprojected.shape[0], axis=0
) # len(M) of T=1 x N x 2

return zip(instances_ordered, insts_coords_list)

def update_instances(self, instances: Dict[Camcorder, "Instance"]):
"""Triangulate, reproject, and update coordinates of `Instances`.
Args:
instances: Dict with `Camcorder` keys and `Instance` values.
Returns:
None
"""

# Triangulate and reproject instance coordinates.
instances_and_coords: Iterator[
Tuple["Instance", np.ndarray]
] = self.calculate_reprojected_points(instances)

# Update the instance coordinates.
for inst, inst_coord in instances_and_coords:
inst.update_points(
inst_coord[0], exclude_complete=True
) # inst_coord is (1, N, 2)

def update_views(
self,
frame_idx: int,
cams_to_include: Optional[List[Camcorder]] = None,
track: Optional["Track"] = None,
):
"""Update the views of the `RecordingSession`.
Args:
frame_idx: Frame index to update (0-indexed).
cams_to_include: List of `Camcorder`s to include. Default is all.
track: `Track` object used to find instances accross views for updating.
Returns:
None
"""

# If not enough `Camcorder`s available/specified, then return
if (cams_to_include is not None and len(cams_to_include) <= 1) or (
len(self.videos) <= 1
):
logger.warning(
"One or less cameras available. "
"Multiple cameras needed to triangulate. "
"Skipping triangulation and reprojection."
)
return

# Get all instances accross views at this frame index
try:
instances: Dict[Camcorder, "Instance"] = self.get_instances_across_views(
frame_idx,
cams_to_include=cams_to_include,
track=track,
require_multiple_views=True,
)
except ValueError:
# If not enough views or instances, then return
logger.warning(
"One or less instances found for frame "
f"{frame_idx} in {self.camera_cluster}. "
"Multiple instances accross multiple views needed to triangulate. "
"Skipping triangulation and reprojection."
)
return

# Update the instance coordinates
self.update_instances(instances)

def __attrs_post_init__(self):
self.camera_cluster.add_session(self)

Expand Down
Loading

0 comments on commit 4706fb2

Please sign in to comment.