From 188ca9c5305fb887efe48fb5460e5f0149d9a558 Mon Sep 17 00:00:00 2001 From: hello-amal Date: Thu, 25 Jul 2024 09:18:07 -0700 Subject: [PATCH] Add text-to-speech CLI Tool (#85) * Add text-to-speech UI * Don't store s or q in history * Added newline before input prompt --- .pre-commit-config.yaml | 1 + CMakeLists.txt | 1 + config/text_to_speech_ui_history.txt | 30 ++++ nodes/text_to_speech.py | 2 +- nodes/text_to_speech_ui.py | 244 +++++++++++++++++++++++++++ 5 files changed, 277 insertions(+), 1 deletion(-) create mode 100644 config/text_to_speech_ui_history.txt create mode 100755 nodes/text_to_speech_ui.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5e205d15..544cbb4e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -20,6 +20,7 @@ repos: - id: mixed-line-ending - id: requirements-txt-fixer - id: trailing-whitespace + exclude: config/text_to_speech_ui_history.txt # isort auto-sorts Python imports - repo: https://github.com/pycqa/isort diff --git a/CMakeLists.txt b/CMakeLists.txt index 15e92a44..4aec4d27 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,6 +42,7 @@ install(PROGRAMS nodes/move_to_pregrasp.py nodes/navigation_camera.py nodes/text_to_speech.py + nodes/text_to_speech_ui.py DESTINATION lib/${PROJECT_NAME} ) diff --git a/config/text_to_speech_ui_history.txt b/config/text_to_speech_ui_history.txt new file mode 100644 index 00000000..318b5788 --- /dev/null +++ b/config/text_to_speech_ui_history.txt @@ -0,0 +1,30 @@ +############################################################################### +# Hard-Coded Utterances +############################################################################### +# General Introduction +Hello, my name is Stretch. +What's your name? +It's nice to meet you, +Thank you for inviting me here today. +Would you like to see the motions I can do? +I can do several types of motions. +I can move around, like this. +I can lengthen or shorten my arm, like this. +I can rotate my wrist, like this. +I can open and close my hand, like this. +You can talk to me and tell me what to do. +Do you have any questions? +# General Tray Delivery +Let me fetch the tray of candy. +I'm going to move my arm closer to you to give you something. +I'm going to bring the tray closer to you. Please wait for my cue. +Please take one candy. +You can put the candy in your mouth now. +Enjoy your candy! +# Prompts Specific to Jane Doe +Jane, it's nice to see you again. +Jane, do you see the tray in front of you? +Jane, please reach inside the tray and take one candy. +############################################################################### +# Live Record of History During Runtime +############################################################################### diff --git a/nodes/text_to_speech.py b/nodes/text_to_speech.py index 7ee2b90e..ee9df3a8 100755 --- a/nodes/text_to_speech.py +++ b/nodes/text_to_speech.py @@ -127,11 +127,11 @@ def run(self): self.engine.is_slow = msg.is_slow # Speak the text + self.get_logger().info(f"Saying: {msg.text}") if self.engine._can_say_async: self.engine.say_async(msg.text) else: self.engine.say(msg.text) - self.get_logger().info(f"Saying: {msg.text}") def main(): diff --git a/nodes/text_to_speech_ui.py b/nodes/text_to_speech_ui.py new file mode 100755 index 00000000..e1cfbd73 --- /dev/null +++ b/nodes/text_to_speech_ui.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 + +# Standard imports +import os +import readline # Improve interactive input, e.g., up to access history, tab auto-completion. +import sys +import threading +from typing import List, Optional + +# Third-party imports +import rclpy +from ament_index_python import get_package_share_directory +from rclpy.node import Node + +# Local Imports +from stretch_web_teleop.msg import TextToSpeech + + +def print_and_flush(message: str): + """ + Print a message and flush the output. + + Parameters + ---------- + message : str + The message to print. + """ + print(message) + sys.stdout.flush() + + +class HistoryCompleter: + """ + This class enables readline tab auto-completion from the history. + + Adapted from https://pymotw.com/3/readline/ + """ + + def __init__(self): + """ + Initialize the HistoryCompleter. + """ + self.matches = [] + + @staticmethod + def get_history_items() -> List[str]: + """ + Get the history items. + + Returns + ------- + List[str] + The history items. + """ + num_items = readline.get_current_history_length() + 1 + return [readline.get_history_item(i) for i in range(1, num_items)] + + def complete(self, text: str, state: int) -> Optional[str]: + """ + Return the next possible completion for 'text'. + + This is called successively with state == 0, 1, 2, ... until it returns None. + + Parameters + ---------- + text : str + The string to complete. + state : int + The state of the completion. + + Returns + ------- + Optional[str] + The next possible completion for 'text'. + """ + response = None + if state == 0: + history_values = HistoryCompleter.get_history_items() + if text: + self.matches = sorted( + h for h in history_values if h and h.startswith(text) + ) + else: + self.matches = [] + try: + response = self.matches[state] + except IndexError: + response = None + return response + + +class TextToSpeechUserInterfaceNode(Node): + """ + A ROS2 node that provides a user interface for text-to-speech. + """ + + def __init__(self): + """ + Initialize the TextToSpeechUserInterfaceNode. + """ + # Initialize the node + super().__init__("text_to_speech_ui") + + # Create the publisher + self.publisher = self.create_publisher(TextToSpeech, "text_to_speech", 1) + + def publish_message(self, message: str): + """ + Publish a message to the text-to-speech topic. + + Parameters + ---------- + message : str + The message to publish. + """ + # Create the message + msg = TextToSpeech( + text=message, + is_slow=False, + override_behavior=( + TextToSpeech.OVERRIDE_BEHAVIOR_INTERRUPT + if len(message) == 0 + else TextToSpeech.OVERRIDE_BEHAVIOR_QUEUE + ), + ) + + # Publish the message + self.publisher.publish(msg) + + def run(self): + """ + Create the user interface for the text-to-speech node. + """ + # Create the input prompt + print_and_flush( + "****************************************************************" + ) + print_and_flush("Instructions:") + print_and_flush(" Type a message to convert to speech.") + print_and_flush(" Press S to stop the current message.") + print_and_flush(" Press Q to exit and stop the current message.") + print_and_flush(" Press Ctrl-C to exit without stopping the current message") + print_and_flush( + "****************************************************************" + ) + + # Get the user input + while rclpy.ok(): + # Get the user input + message = input("\nMessage (S to stop, Q to exit): ").strip() + + # Process the special 1-character commands + if len(message) == 0: + continue + elif len(message) == 1: + if message.upper() == "Q": + self.publish_message("") + readline.remove_history_item( + readline.get_current_history_length() - 1 + ) + raise KeyboardInterrupt + elif message.upper() == "S": + # Stop the current message + self.publish_message("") + readline.remove_history_item( + readline.get_current_history_length() - 1 + ) + continue + + # Publish the message + self.publish_message(message) + + +def spin(node: Node, executor: rclpy.executors.Executor): + """ + Spin the node in the background. + + Parameters + ---------- + node : Node + The node to spin. + executor : rclpy.executors.Executor + The executor to spin. + """ + try: + rclpy.spin(node, executor) + except rclpy.executors.ExternalShutdownException: + pass + + +if __name__ == "__main__": + # Configure the GNU readline module for better interactive input + history_filename = "text_to_speech_ui_history.txt" + config_share_dir = os.path.join( + get_package_share_directory("stretch_web_teleop"), + "config", + ) + config_src_dir = os.path.expanduser("~/ament_ws/src/stretch_web_teleop/config") + readline.read_history_file(os.path.join(config_share_dir, history_filename)) + readline.set_completer(HistoryCompleter().complete) + readline.parse_and_bind("tab: complete") + readline.set_completer_delims("") # Match the entire string, not individual words + + # Initialize the node + rclpy.init() + node = TextToSpeechUserInterfaceNode() + print_and_flush("Initialized the text-to-speech user interface node.") + + # Spin in the background, as the node initializes + executor = rclpy.executors.SingleThreadedExecutor() + spin_thread = threading.Thread( + target=spin, + args=(node,), + kwargs={"executor": executor}, + daemon=True, + ) + spin_thread.start() + + # Run the node + try: + node.run() + except KeyboardInterrupt: + node.destroy_node() + try: + rclpy.shutdown() + except rclpy._rclpy_pybind11.RCLError: + pass + print("") + + # Save the history + readline.write_history_file(os.path.join(config_share_dir, history_filename)) + print_and_flush(f"Saved the history to {config_share_dir}") + if os.path.isdir(config_src_dir): + readline.write_history_file(os.path.join(config_src_dir, history_filename)) + print_and_flush(f"Saved the history to {config_src_dir}") + else: + print_and_flush( + f"Could not save the history to {config_src_dir} . Please manually copy it " + f"from {config_share_dir} to {config_src_dir}" + ) + + # Spin in the foreground + spin_thread.join() + print_and_flush("Cleanly terminated.")