diff --git a/.devcontainer/.vscode-docker/settings.json b/.devcontainer/.vscode-docker/settings.json index aca4cd0..00524e6 100644 --- a/.devcontainer/.vscode-docker/settings.json +++ b/.devcontainer/.vscode-docker/settings.json @@ -1,11 +1,13 @@ { "python.autoComplete.extraPaths": [ + "/home/microstrain/catkin_ws/src/ntrip_client/src", "/home/microstrain/catkin_ws/devel/lib/python3/dist-packages", "/home/microstrain/catkin_ws/devel/lib/python2.7/dist-packages", "/opt/ros/noetic/lib/python3/dist-packages", "/opt/ros/melodic/lib/python2.7/dist-packages" ], "python.analysis.extraPaths": [ + "/home/microstrain/catkin_ws/src/ntrip_client/src", "/home/microstrain/catkin_ws/devel/lib/python3/dist-packages", "/home/microstrain/catkin_ws/devel/lib/python2.7/dist-packages", "/opt/ros/noetic/lib/python3/dist-packages", diff --git a/launch/ntrip_client_launch.py b/launch/ntrip_client_launch.py index 2ea5011..9da5806 100644 --- a/launch/ntrip_client_launch.py +++ b/launch/ntrip_client_launch.py @@ -46,7 +46,14 @@ def generate_launch_description(): 'password': LaunchConfiguration('password'), # Not sure if this will be looked at by other ndoes, but this frame ID will be added to the RTCM messages published by this node - 'rtcm_frame_id': 'odom' + 'rtcm_frame_id': 'odom', + + # Will affect how many times the node will attempt to reconnect before exiting, and how long it will wait in between attempts when a reconnect occurs + 'reconnect_attempt_max': 10, + 'reconnect_attempt_wait_seconds': 5, + + # How many seconds is acceptable in between receiving RTCM. If RTCM is not received for this duration, the node will attempt to reconnect + 'rtcm_timeout_seconds': 4 } ], # Uncomment the following section and replace "/gq7/nmea/sentence" with the topic you are sending NMEA on if it is not the one we requested diff --git a/scripts/ntrip_ros.py b/scripts/ntrip_ros.py index 38b48d3..fd097b0 100755 --- a/scripts/ntrip_ros.py +++ b/scripts/ntrip_ros.py @@ -33,7 +33,10 @@ def __init__(self): ('authenticate', False), ('username', ''), ('password', ''), - ('rtcm_frame_id', 'odom') + ('rtcm_frame_id', 'odom'), + ('reconnect_attempt_max', NTRIPClient.DEFAULT_RECONNECT_ATTEMPT_MAX), + ('reconnect_attempt_wait_seconds', NTRIPClient.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS), + ('rtcm_timeout_seconds', NTRIPClient.DEFAULT_RTCM_TIMEOUT_SECONDS), ] ) @@ -69,6 +72,11 @@ def __init__(self): # Read an optional Frame ID from the config self._rtcm_frame_id = self.get_parameter('rtcm_frame_id').value + # Get some timeout parameters for the NTRIP client + reconnect_attempt_max = self.get_parameter('reconnect_attempt_max').value + reconnect_attempt_wait_seconds = self.get_parameter('reconnect_attempt_wait_seconds').value + rtcm_timeout_seconds = self.get_parameter('rtcm_timeout_seconds').value + # Setup the RTCM publisher self._rtcm_pub = self.create_publisher(RTCM, 'rtcm', 10) @@ -86,6 +94,11 @@ def __init__(self): logdebug=self.get_logger().debug ) + # Set parameters on the client + self._client.reconnect_attempt_max = reconnect_attempt_max + self._client.reconnect_attempt_wait_seconds = reconnect_attempt_wait_seconds + self._client.rtcm_timeout_seconds = rtcm_timeout_seconds + def run(self): # Connect the client if not self._client.connect(): diff --git a/src/ntrip_client/ntrip_client.py b/src/ntrip_client/ntrip_client.py index 974bfab..db9640e 100644 --- a/src/ntrip_client/ntrip_client.py +++ b/src/ntrip_client/ntrip_client.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +import time import base64 import socket import select @@ -14,15 +15,20 @@ ] _SUCCESS_RESPONSES = [ 'ICY 200 OK', - 'HTTP/1.0 200 OK' + 'HTTP/1.0 200 OK', + 'HTTP/1.1 200 OK' ] _UNAUTHORIZED_RESPONSES = [ '401' ] - class NTRIPClient: + # Public constants + DEFAULT_RECONNECT_ATTEMPT_MAX = 10 + DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS = 5 + DEFAULT_RTCM_TIMEOUT_SECONDS = 4 + def __init__(self, host, port, mountpoint, ntrip_version, username, password, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, logdebug=logging.debug): # Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS self._logerr = logerr @@ -41,9 +47,6 @@ def __init__(self, host, port, mountpoint, ntrip_version, username, password, lo else: self._basic_credentials = None - # Create a socket object that we will use to connect to the server - self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # Setup some parsers to parse incoming messages self._rtcm_parser = RTCMParser( logerr=logerr, @@ -59,9 +62,28 @@ def __init__(self, host, port, mountpoint, ntrip_version, username, password, lo ) # Setup some state + self._shutdown = False self._connected = False + # Private reconnect info + self._reconnect_attempt_count = 0 + self._nmea_send_failed_count = 0 + self._nmea_send_failed_max = 5 + self._read_zero_bytes_count = 0 + self._read_zero_bytes_max = 5 + self._first_rtcm_received = False + self._recv_rtcm_last_packet_timestamp = 0 + + # Public reconnect info + self.reconnect_attempt_max = self.DEFAULT_RECONNECT_ATTEMPT_MAX + self.reconnect_attempt_wait_seconds = self.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS + self.rtcm_timeout_seconds = self.DEFAULT_RTCM_TIMEOUT_SECONDS + def connect(self): + # Create a socket object that we will use to connect to the server + self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._server_socket.settimeout(5) + # Connect the socket to the server try: self._server_socket.connect((self._host, self._port)) @@ -83,7 +105,7 @@ def connect(self): # Get the response from the server response = '' try: - response = self._server_socket.recv(1024).decode('utf-8') + response = self._server_socket.recv(_CHUNK_SIZE).decode('utf-8') except Exception as e: self._logerr( 'Unable to read response from server at http://{}:{}'.format(self._host, self._port)) @@ -92,7 +114,6 @@ def connect(self): # Properly handle the response if any(success in response for success in _SUCCESS_RESPONSES): - self._server_socket.setblocking(False) self._connected = True # Some debugging hints about the kind of error we received @@ -122,8 +143,36 @@ def connect(self): def disconnect(self): # Disconnect the socket - self._server_socket.close() self._connected = False + try: + self._server_socket.shutdown(socket.SHUT_RDWR) + except Exception as e: + self._logdebug('Encountered exception when shutting down the socket. This can likely be ignored') + self._logdebug('Exception: {}'.format(e)) + try: + self._server_socket.close() + except Exception as e: + self._logdebug('Encountered exception when closing the socket. This can likely be ignored') + self._logdebug('Exception: {}'.format(e)) + + def reconnect(self): + if self._connected: + while not self._shutdown: + self._reconnect_attempt_count += 1 + self.disconnect() + connect_success = self.connect() + if not connect_success and self._reconnect_attempt_count < self.reconnect_attempt_max: + self._logerr('Reconnect to http://{}:{} failed. Retrying in {} seconds'.format(self._host, self._port, self.reconnect_attempt_wait_seconds)) + time.sleep(self.reconnect_attempt_wait_seconds) + elif self._reconnect_attempt_count >= self.reconnect_attempt_max: + self._reconnect_attempt_count = 0 + raise Exception("Reconnect was attempted {} times, but never succeeded".format(self._reconnect_attempt_count)) + break + elif connect_success: + self._reconnect_attempt_count = 0 + break + else: + self._logdebug('Reconnect called while still connected, ignoring') def send_nmea(self, sentence): if not self._connected: @@ -147,12 +196,25 @@ def send_nmea(self, sentence): except Exception as e: self._logwarn('Unable to send NMEA sentence to server.') self._logwarn('Exception: {}'.format(str(e))) + self._nmea_send_failed_count += 1 + if self._nmea_send_failed_count >= self._nmea_send_failed_max: + self._logwarn("NMEA sentence failed to send to server {} times, restarting".format(self._nmea_send_failed_count)) + self.reconnect() + self._nmea_send_failed_count = 0 + self.send_nmea(sentence) # Try sending the NMEA sentence again + def recv_rtcm(self): if not self._connected: self._logwarn( 'RTCM requested before client was connected, returning empty list') return [] + + # If it has been too long since we received an RTCM packet, reconnect + if time.time() - self.rtcm_timeout_seconds >= self._recv_rtcm_last_packet_timestamp and self._first_rtcm_received: + self._logerr('RTCM data not received for {} seconds, reconnecting'.format(self.rtcm_timeout_seconds)) + self.reconnect() + self._first_rtcm_received = False # Check if there is any data available on the socket read_sockets, _, _ = select.select([self._server_socket], [], [], 0) @@ -163,15 +225,42 @@ def recv_rtcm(self): # Read all available data into a buffer data = b'' while True: - chunk = self._server_socket.recv(_CHUNK_SIZE) - data += chunk - if len(chunk) < _CHUNK_SIZE: + try: + chunk = self._server_socket.recv(_CHUNK_SIZE) + data += chunk + if len(chunk) < _CHUNK_SIZE: + break + except Exception as e: + self._logerr('Error while reading {} bytes from socket'.format(_CHUNK_SIZE)) + if not self._socket_is_open(): + self._logerr('Socket appears to be closed. Reconnecting') + self.reconnect() + return [] break self._logdebug('Read {} bytes'.format(len(data))) + # If 0 bytes were read from the socket even though we were told data is available multiple times, + # it can be safely assumed that we can reconnect as the server has closed the connection + if len(data) == 0: + self._read_zero_bytes_count += 1 + if self._read_zero_bytes_count >= self._read_zero_bytes_max: + self._logwarn('Reconnecting because we received 0 bytes from the socket even though it said there was data available {} times'.format(self._read_zero_bytes_count)) + self.reconnect() + self._read_zero_bytes_count = 0 + return [] + else: + # Looks like we received valid data, so note when the data was received + self._recv_rtcm_last_packet_timestamp = time.time() + self._first_rtcm_received = True + # Send the data to the RTCM parser to parse it return self._rtcm_parser.parse(data) if data else [] + def shutdown(self): + # Set some state, and then disconnect + self._shutdown = True + self.disconnect() + def _form_request(self): if self._ntrip_version != None and self._ntrip_version != '': request_str = 'GET /{} HTTP/1.0\r\nNtrip-Version: {}\r\nUser-Agent: NTRIP ntrip_client_ros\r\n'.format( @@ -183,4 +272,23 @@ def _form_request(self): request_str += 'Authorization: Basic {}\r\n'.format( self._basic_credentials) request_str += '\r\n' - return request_str.encode('utf-8') \ No newline at end of file + return request_str.encode('utf-8') + + def _socket_is_open(self): + try: + # this will try to read bytes without blocking and also without removing them from buffer (peek only) + data = self._server_socket.recv(_CHUNK_SIZE, socket.MSG_DONTWAIT | socket.MSG_PEEK) + if len(data) == 0: + return False + except BlockingIOError: + return True # socket is open and reading from it would block + except ConnectionResetError: + self._logwarn('Connection reset by peer') + return False # socket was closed for some other reason + except socket.timeout: + return True # timeout likely means that the socket is still open + except Exception as e: + self._logwarn('Socket appears to be closed') + self._logwarn('Exception: {}'.format(e)) + return False + return True \ No newline at end of file