Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROS2 RGBD hand gestures recognition #341

Merged
merged 4 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020-2022 OpenDR European Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import os
import cv2
import numpy as np
import torch

import rclpy
from rclpy.node import Node
import message_filters
from sensor_msgs.msg import Image as ROS_Image
from vision_msgs.msg import Classification2D

from opendr_ros2_bridge import ROS2Bridge
from opendr.engine.data import Image
from opendr.perception.multimodal_human_centric import RgbdHandGestureLearner


class RgbdHandGestureNode(Node):

def __init__(self, input_rgb_image_topic="/kinect2/qhd/image_color_rect",
input_depth_image_topic="/kinect2/qhd/image_depth_rect",
output_gestures_topic="/opendr/gestures", device="cuda", delay=0.1):
"""
Creates a ROS2 Node for gesture recognition from RGBD. Assuming that the following drivers have been installed:
https://github.com/OpenKinect/libfreenect2 and https://github.com/code-iai/iai_kinect2.
:param input_rgb_image_topic: Topic from which we are reading the input image
:type input_rgb_image_topic: str
:param input_depth_image_topic: Topic from which we are reading the input depth image
:type input_depth_image_topic: str
:param output_gestures_topic: Topic to which we are publishing the predicted gesture class
:type output_gestures_topic: str
:param device: Device on which we are running inference ('cpu' or 'cuda')
:type device: str
:param delay: Define the delay (in seconds) with which rgb message and depth message can be synchronized
:type delay: float
"""
super().__init__("rgbd_hand_gesture_recognition_node")

self.gesture_publisher = self.create_publisher(Classification2D, output_gestures_topic, 1)

image_sub = message_filters.Subscriber(self, ROS_Image, input_rgb_image_topic, qos_profile=1)
depth_sub = message_filters.Subscriber(self, ROS_Image, input_depth_image_topic, qos_profile=1)
# synchronize image and depth data topics
ts = message_filters.ApproximateTimeSynchronizer([image_sub, depth_sub], queue_size=10, slop=delay)
ts.registerCallback(self.callback)

self.bridge = ROS2Bridge()

# Initialize the gesture recognition
self.gesture_learner = RgbdHandGestureLearner(n_class=16, architecture="mobilenet_v2", device=device)
model_path = './mobilenet_v2'
if not os.path.exists(model_path):
self.gesture_learner.download(path=model_path)
self.gesture_learner.load(path=model_path)

# mean and std for preprocessing, based on HANDS dataset
self.mean = np.asarray([0.485, 0.456, 0.406, 0.0303]).reshape(1, 1, 4)
self.std = np.asarray([0.229, 0.224, 0.225, 0.0353]).reshape(1, 1, 4)

self.get_logger().info("RGBD gesture recognition node started!")

def callback(self, rgb_data, depth_data):
"""
Callback that process the input data and publishes to the corresponding topics
:param rgb_data: input image message
:type rgb_data: sensor_msgs.msg.Image
:param depth_data: input depth image message
:type depth_data: sensor_msgs.msg.Image
"""

# Convert sensor_msgs.msg.Image into OpenDR Image and preprocess
rgb_image = self.bridge.from_ros_image(rgb_data, encoding='bgr8')
depth_data.encoding = 'mono16'
depth_image = self.bridge.from_ros_image_to_depth(depth_data, encoding='mono16')
img = self.preprocess(rgb_image, depth_image)

# Run gesture recognition
gesture_class = self.gesture_learner.infer(img)

# Publish results
ros_gesture = self.bridge.from_category_to_rosclass(gesture_class, self.get_clock().now().to_msg())
self.gesture_publisher.publish(ros_gesture)

def preprocess(self, rgb_image, depth_image):
"""
Preprocess rgb_image, depth_image and concatenate them
:param rgb_image: input RGB image
:type rgb_image: engine.data.Image
:param depth_image: input depth image
:type depth_image: engine.data.Image
"""
rgb_image = rgb_image.convert(format='channels_last') / (2**8 - 1)
depth_image = depth_image.convert(format='channels_last') / (2**16 - 1)

# resize the images to 224x224
rgb_image = cv2.resize(rgb_image, (224, 224))
depth_image = cv2.resize(depth_image, (224, 224))

# concatenate and standardize
img = np.concatenate([rgb_image, np.expand_dims(depth_image, axis=-1)], axis=-1)
img = (img - self.mean) / self.std
img = Image(img, dtype=np.float32)
return img


def main(args=None):
rclpy.init(args=args)

# Default topics are according to kinectv2 drivers at https://github.com/OpenKinect/libfreenect2
# and https://github.com/code-iai-iai_kinect2
parser = argparse.ArgumentParser()
parser.add_argument("--input_rgb_image_topic", help="Topic name for input rgb image",
type=str, default="/kinect2/qhd/image_color_rect")
parser.add_argument("--input_depth_image_topic", help="Topic name for input depth image",
type=str, default="/kinect2/qhd/image_depth_rect")
parser.add_argument("--output_gestures_topic", help="Topic name for predicted gesture class",
type=str, default="/opendr/gestures")
parser.add_argument("--device", help="Device to use (cpu, cuda)", type=str, default="cuda",
choices=["cuda", "cpu"])
parser.add_argument("--delay", help="The delay (in seconds) with which RGB message and"
"depth message can be synchronized", type=float, default=0.1)

args = parser.parse_args()

# Select the device for running
try:
if args.device == "cuda" and torch.cuda.is_available():
device = "cuda"
elif args.device == "cuda":
print("GPU not found. Using CPU instead.")
device = "cpu"
else:
print("Using CPU")
device = "cpu"
except:
print("Using CPU")
device = "cpu"

gesture_node = RgbdHandGestureNode(input_rgb_image_topic=args.input_rgb_image_topic,
input_depth_image_topic=args.input_depth_image_topic,
output_gestures_topic=args.output_gestures_topic, device=device,
delay=args.delay)

rclpy.spin(gesture_node)

gesture_node.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
3 changes: 2 additions & 1 deletion projects/opendr_ws_2/src/opendr_perception/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
'semantic_segmentation_bisenet = opendr_perception.semantic_segmentation_bisenet_node:main',
'face_recognition = opendr_perception.face_recognition_node:main',
'fall_detection = opendr_perception.fall_detection_node:main',
'video_activity_recognition = opendr_perception.video_activity_recognition_node:main'
'video_activity_recognition = opendr_perception.video_activity_recognition_node:main',
'rgbd_hand_gestures_recognition = opendr_perception.rgbd_hand_gesture_recognition_node:main',
],
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
from opendr.engine.target import Pose, BoundingBox, BoundingBoxList, Category

from cv_bridge import CvBridge
from std_msgs.msg import String
from std_msgs.msg import String, Header
from sensor_msgs.msg import Image as ImageMsg
from vision_msgs.msg import Detection2DArray, Detection2D, BoundingBox2D, ObjectHypothesis, ObjectHypothesisWithPose
from vision_msgs.msg import Detection2DArray, Detection2D, BoundingBox2D, ObjectHypothesis, ObjectHypothesisWithPose, \
Classification2D
from geometry_msgs.msg import Pose2D
from opendr_ros2_messages.msg import OpenDRPose2D, OpenDRPose2DKeypoint

Expand Down Expand Up @@ -280,3 +281,41 @@ def to_ros_category_description(self, category):
result = String()
result.data = category.description
return result

def from_ros_image_to_depth(self, message, encoding='mono16'):
"""
Converts a ROS2 image message into an OpenDR grayscale depth image
:param message: ROS2 image to be converted
:type message: sensor_msgs.msg.Image
:param encoding: encoding to be used for the conversion
:type encoding: str
:return: OpenDR image
:rtype: engine.data.Image
"""
cv_image = self._cv_bridge.imgmsg_to_cv2(message, desired_encoding=encoding)
cv_image = np.expand_dims(cv_image, axis=-1)
image = Image(np.asarray(cv_image, dtype=np.uint8))
return image

def from_category_to_rosclass(self, prediction, timestamp, source_data=None):
"""
Converts OpenDR Category into Classification2D message with class label, confidence, timestamp and corresponding input
:param prediction: classification prediction
:type prediction: engine.target.Category
:param timestamp: time stamp for header message
:type timestamp: str
:param source_data: corresponding input or None
:return classification
:rtype: vision_msgs.msg.Classification2D
"""
classification = Classification2D()
classification.header = Header()
classification.header.stamp = timestamp

result = ObjectHypothesis()
result.id = str(prediction.data)
result.score = prediction.confidence
classification.results.append(result)
if source_data is not None:
classification.source_img = source_data
return classification