Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROS automatic reconnect #18

Merged
merged 3 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .devcontainer/.vscode-docker/settings.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
{
"python.autoComplete.extraPaths": [
"/home/microstrain/catkin_ws/src/ntrip_client/src",
"/home/microstrain/catkin_ws/devel/lib/python3/dist-packages",
"/home/microstrain/catkin_ws/devel/lib/python2.7/dist-packages",
"/opt/ros/noetic/lib/python3/dist-packages",
"/opt/ros/melodic/lib/python2.7/dist-packages"
],
"python.analysis.extraPaths": [
"/home/microstrain/catkin_ws/src/ntrip_client/src",
"/home/microstrain/catkin_ws/devel/lib/python3/dist-packages",
"/home/microstrain/catkin_ws/devel/lib/python2.7/dist-packages",
"/opt/ros/noetic/lib/python3/dist-packages",
Expand Down
9 changes: 8 additions & 1 deletion launch/ntrip_client.launch
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,16 @@
<param name="username" value="$(arg username)" />
<param name="password" value="$(arg password)" />

<!-- Not sure if this will be looked at, but this frame ID will be added to the RTCM messages publishe by this node -->
<!-- Not sure if this will be looked at, but this frame ID will be added to the RTCM messages published by this node -->
<param name="rtcm_frame_id" value="odom" />

<!-- Will affect how many times the node will attempt to reconnect before exiting, and how long it will wait in between attempts when a reconnect occurs -->
<param name="reconnect_attempt_max" value="10" />
<param name="reconnect_attempt_wait_seconds" value="5" />

<!-- How many seconds is acceptable in between receiving RTCM. If RTCM is not received for this duration, the node will attempt to reconnect -->
<param name="rtcm_timeout_seconds" value="4" />

<!-- Uncomment the following section and replace "/gx5/nmea/sentence" with the topic you are sending NMEA on if it is not the one we requested -->
<!--<remap from="/ntrip_client/nmea" to="/gx5/nmea/sentence" />-->
</node>
Expand Down
12 changes: 11 additions & 1 deletion scripts/ntrip_ros.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def __init__(self):
# Read an optional Frame ID from the config
self._rtcm_frame_id = rospy.get_param('~rtcm_frame_id', 'odom')

# Get some timeout parameters for the NTRIP client
reconnect_attempt_max = rospy.get_param('~reconnect_attempt_max', NTRIPClient.DEFAULT_RECONNECT_ATTEMPT_MAX)
reconnect_attempt_wait_seconds = rospy.get_param('~reconnect_attempt_wait_seconds', NTRIPClient.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS)
rtcm_timeout_seconds = rospy.get_param('~rtcm_timeout_seconds', NTRIPClient.DEFAULT_RTCM_TIMEOUT_SECONDS)

# Setup the RTCM publisher
self._rtcm_timer = None
self._rtcm_pub = rospy.Publisher('rtcm', RTCM, queue_size=10)
Expand All @@ -70,6 +75,11 @@ def __init__(self):
logdebug=rospy.logdebug
)

# Set parameters on the client
self._client.reconnect_attempt_max = reconnect_attempt_max
self._client.reconnect_attempt_wait_seconds = reconnect_attempt_wait_seconds
self._client.rtcm_timeout_seconds = rtcm_timeout_seconds

def run(self):
# Setup a shutdown hook
rospy.on_shutdown(self.stop)
Expand All @@ -95,7 +105,7 @@ def stop(self):
self._rtcm_timer.shutdown()
self._rtcm_timer.join()
rospy.loginfo('Disconnecting NTRIP client')
self._client.disconnect()
self._client.shutdown()

def subscribe_nmea(self, nmea):
# Just extract the NMEA from the message, and send it right to the server
Expand Down
130 changes: 119 additions & 11 deletions src/ntrip_client/ntrip_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python

import time
import base64
import socket
import select
Expand All @@ -14,15 +15,20 @@
]
_SUCCESS_RESPONSES = [
'ICY 200 OK',
'HTTP/1.0 200 OK'
'HTTP/1.0 200 OK',
'HTTP/1.1 200 OK'
]
_UNAUTHORIZED_RESPONSES = [
'401'
]


class NTRIPClient:

# Public constants
DEFAULT_RECONNECT_ATTEMPT_MAX = 10
DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS = 5
DEFAULT_RTCM_TIMEOUT_SECONDS = 4

def __init__(self, host, port, mountpoint, ntrip_version, username, password, logerr=logging.error, logwarn=logging.warning, loginfo=logging.info, logdebug=logging.debug):
# Bit of a strange pattern here, but save the log functions so we can be agnostic of ROS
self._logerr = logerr
Expand All @@ -41,9 +47,6 @@ def __init__(self, host, port, mountpoint, ntrip_version, username, password, lo
else:
self._basic_credentials = None

# Create a socket object that we will use to connect to the server
self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Setup some parsers to parse incoming messages
self._rtcm_parser = RTCMParser(
logerr=logerr,
Expand All @@ -59,9 +62,28 @@ def __init__(self, host, port, mountpoint, ntrip_version, username, password, lo
)

# Setup some state
self._shutdown = False
self._connected = False

# Private reconnect info
self._reconnect_attempt_count = 0
self._nmea_send_failed_count = 0
self._nmea_send_failed_max = 5
self._read_zero_bytes_count = 0
self._read_zero_bytes_max = 5
self._first_rtcm_received = False
self._recv_rtcm_last_packet_timestamp = 0

# Public reconnect info
self.reconnect_attempt_max = self.DEFAULT_RECONNECT_ATTEMPT_MAX
self.reconnect_attempt_wait_seconds = self.DEFAULT_RECONNECT_ATEMPT_WAIT_SECONDS
self.rtcm_timeout_seconds = self.DEFAULT_RTCM_TIMEOUT_SECONDS

def connect(self):
# Create a socket object that we will use to connect to the server
self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server_socket.settimeout(5)

# Connect the socket to the server
try:
self._server_socket.connect((self._host, self._port))
Expand All @@ -83,7 +105,7 @@ def connect(self):
# Get the response from the server
response = ''
try:
response = self._server_socket.recv(1024).decode('utf-8')
response = self._server_socket.recv(_CHUNK_SIZE).decode('utf-8')
except Exception as e:
self._logerr(
'Unable to read response from server at http://{}:{}'.format(self._host, self._port))
Expand All @@ -92,7 +114,6 @@ def connect(self):

# Properly handle the response
if any(success in response for success in _SUCCESS_RESPONSES):
self._server_socket.setblocking(False)
self._connected = True

# Some debugging hints about the kind of error we received
Expand Down Expand Up @@ -122,8 +143,36 @@ def connect(self):

def disconnect(self):
# Disconnect the socket
self._server_socket.close()
self._connected = False
try:
self._server_socket.shutdown(socket.SHUT_RDWR)
except Exception as e:
self._logdebug('Encountered exception when shutting down the socket. This can likely be ignored')
self._logdebug('Exception: {}'.format(e))
try:
self._server_socket.close()
except Exception as e:
self._logdebug('Encountered exception when closing the socket. This can likely be ignored')
self._logdebug('Exception: {}'.format(e))

def reconnect(self):
if self._connected:
while not self._shutdown:
self._reconnect_attempt_count += 1
self.disconnect()
connect_success = self.connect()
if not connect_success and self._reconnect_attempt_count < self.reconnect_attempt_max:
self._logerr('Reconnect to http://{}:{} failed. Retrying in {} seconds'.format(self._host, self._port, self.reconnect_attempt_wait_seconds))
time.sleep(self.reconnect_attempt_wait_seconds)
elif self._reconnect_attempt_count >= self.reconnect_attempt_max:
self._reconnect_attempt_count = 0
raise Exception("Reconnect was attempted {} times, but never succeeded".format(self._reconnect_attempt_count))
break
elif connect_success:
self._reconnect_attempt_count = 0
break
else:
self._logdebug('Reconnect called while still connected, ignoring')

def send_nmea(self, sentence):
if not self._connected:
Expand All @@ -147,12 +196,25 @@ def send_nmea(self, sentence):
except Exception as e:
self._logwarn('Unable to send NMEA sentence to server.')
self._logwarn('Exception: {}'.format(str(e)))
self._nmea_send_failed_count += 1
if self._nmea_send_failed_count >= self._nmea_send_failed_max:
self._logwarn("NMEA sentence failed to send to server {} times, restarting".format(self._nmea_send_failed_count))
self.reconnect()
self._nmea_send_failed_count = 0
self.send_nmea(sentence) # Try sending the NMEA sentence again


def recv_rtcm(self):
if not self._connected:
self._logwarn(
'RTCM requested before client was connected, returning empty list')
return []

# If it has been too long since we received an RTCM packet, reconnect
if time.time() - self.rtcm_timeout_seconds >= self._recv_rtcm_last_packet_timestamp and self._first_rtcm_received:
self._logerr('RTCM data not received for {} seconds, reconnecting'.format(self.rtcm_timeout_seconds))
self.reconnect()
self._first_rtcm_received = False

# Check if there is any data available on the socket
read_sockets, _, _ = select.select([self._server_socket], [], [], 0)
Expand All @@ -163,15 +225,42 @@ def recv_rtcm(self):
# Read all available data into a buffer
data = b''
while True:
chunk = self._server_socket.recv(_CHUNK_SIZE)
data += chunk
if len(chunk) < _CHUNK_SIZE:
try:
chunk = self._server_socket.recv(_CHUNK_SIZE)
data += chunk
if len(chunk) < _CHUNK_SIZE:
break
except Exception as e:
self._logerr('Error while reading {} bytes from socket'.format(_CHUNK_SIZE))
if not self._socket_is_open():
self._logerr('Socket appears to be closed. Reconnecting')
self.reconnect()
return []
break
self._logdebug('Read {} bytes'.format(len(data)))

# If 0 bytes were read from the socket even though we were told data is available multiple times,
# it can be safely assumed that we can reconnect as the server has closed the connection
if len(data) == 0:
self._read_zero_bytes_count += 1
if self._read_zero_bytes_count >= self._read_zero_bytes_max:
self._logwarn('Reconnecting because we received 0 bytes from the socket even though it said there was data available {} times'.format(self._read_zero_bytes_count))
self.reconnect()
self._read_zero_bytes_count = 0
return []
else:
# Looks like we received valid data, so note when the data was received
self._recv_rtcm_last_packet_timestamp = time.time()
self._first_rtcm_received = True

# Send the data to the RTCM parser to parse it
return self._rtcm_parser.parse(data) if data else []

def shutdown(self):
# Set some state, and then disconnect
self._shutdown = True
self.disconnect()

def _form_request(self):
if self._ntrip_version != None and self._ntrip_version != '':
request_str = 'GET /{} HTTP/1.0\r\nNtrip-Version: {}\r\nUser-Agent: NTRIP ntrip_client_ros\r\n'.format(
Expand All @@ -184,3 +273,22 @@ def _form_request(self):
self._basic_credentials)
request_str += '\r\n'
return request_str.encode('utf-8')

def _socket_is_open(self):
try:
# this will try to read bytes without blocking and also without removing them from buffer (peek only)
data = self._server_socket.recv(_CHUNK_SIZE, socket.MSG_DONTWAIT | socket.MSG_PEEK)
if len(data) == 0:
return False
except BlockingIOError:
return True # socket is open and reading from it would block
except ConnectionResetError:
self._logwarn('Connection reset by peer')
return False # socket was closed for some other reason
except socket.timeout:
return True # timeout likely means that the socket is still open
except Exception as e:
self._logwarn('Socket appears to be closed')
self._logwarn('Exception: {}'.format(e))
return False
return True